Graph: Update Python code to follow PEP-8 - Changed indentation to use spaces instead of tabs - Updated to PEP-8 guidelines - Updated to follow style guide convention - Refactored few functions to clean code and design
Closes #148 Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/d487df3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/d487df3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/d487df3c Branch: refs/heads/master Commit: d487df3c46f984fad6c15b8a1d3e50dc30f6b6f6 Parents: 8c9b955 Author: Rahul Iyer <ri...@apache.org> Authored: Fri Jul 7 22:23:18 2017 -0700 Committer: Rahul Iyer <ri...@apache.org> Committed: Thu Jul 20 14:23:22 2017 -0700 ---------------------------------------------------------------------- src/ports/postgres/modules/graph/apsp.py_in | 1294 +++++++++--------- .../postgres/modules/graph/graph_utils.py_in | 170 +-- src/ports/postgres/modules/graph/pagerank.py_in | 916 +++++++------ src/ports/postgres/modules/graph/sssp.py_in | 1125 +++++++-------- src/ports/postgres/modules/graph/wcc.py_in | 119 +- .../postgres/modules/utilities/utilities.py_in | 45 +- 6 files changed, 1842 insertions(+), 1827 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/apsp.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/apsp.py_in b/src/ports/postgres/modules/graph/apsp.py_in index 1331301..ab8a566 100644 --- a/src/ports/postgres/modules/graph/apsp.py_in +++ b/src/ports/postgres/modules/graph/apsp.py_in @@ -38,17 +38,16 @@ from utilities.utilities import _assert from utilities.utilities import extract_keyvalue_params from utilities.utilities import unique_string from utilities.utilities import split_quoted_delimited_str -from utilities.validate_args import get_cols +from utilities.utilities import is_platform_pg, is_platform_hawq from utilities.validate_args import table_exists from utilities.validate_args import columns_exist_in_table from utilities.validate_args import table_is_empty from utilities.validate_args import get_expr_type -m4_changequote(`<!', `!>') def graph_apsp(schema_madlib, vertex_table, vertex_id, edge_table, - edge_args, out_table, grouping_cols, **kwargs): - """ + edge_args, out_table, grouping_cols, **kwargs): + """ All Pairs shortest path function for graphs using the matrix multiplication based algorithm [1]. [1] http://users.cecs.anu.edu.au/~Alistair.Rendell/Teaching/apac_comp3600/module4/all_pairs_shortest_paths.xhtml @@ -57,655 +56,656 @@ def graph_apsp(schema_madlib, vertex_table, vertex_id, edge_table, @param vertex_id Name of the column containing the vertex ids. @param edge_table Name of the table that contains the edge data. @param edge_args A comma-delimited string containing multiple - named arguments of the form "name=value". - @param out_table Name of the table to store the result of APSP. + named arguments of the form "name=value". + @param out_table Name of the table to store the result of APSP. @param grouping_cols The list of grouping columns. """ + with MinWarning("warning"): + + INT_MAX = 2147483647 + INFINITY = "'Infinity'" + EPSILON = 0.000001 + + params_types = {'src': str, 'dest': str, 'weight': str} + default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'} + edge_params = extract_keyvalue_params(edge_args, params_types, default_args) + + # Prepare the input for recording in the summary table + if vertex_id is None: + v_st = "NULL" + vertex_id = "id" + else: + v_st = vertex_id + if edge_args is None: + e_st = "NULL" + else: + e_st = edge_args + if grouping_cols is None: + g_st = "NULL" + glist = None + else: + g_st = grouping_cols + glist = split_quoted_delimited_str(grouping_cols) + + src = edge_params["src"] + dest = edge_params["dest"] + weight = edge_params["weight"] + + distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(src) + is_hawq = is_platform_hawq() + + _validate_apsp(vertex_table, vertex_id, edge_table, + edge_params, out_table, glist) + + out_table_1 = unique_string(desp='out_table_1') + out_table_2 = unique_string(desp='out_table_2') + tmp_view = unique_string(desp='tmp_view') + v1 = unique_string(desp='v1') + v2 = unique_string(desp='v2') + message = unique_string(desp='message') + + # Initialize grouping related variables + comma_grp = "" + comma_grp_e = "" + comma_grp_m = "" + grp_comma = "" + grp_v1_comma = "" + grp_o1_comma = "" + grp_o_comma = "" + checkg_eo = "" + checkg_eout = "" + checkg_ex = "" + checkg_om = "" + checkg_o1t_sub = "" + checkg_ot_sub = "" + checkg_ot = "" + checkg_o1t = "" + checkg_vv = "" + checkg_o2v = "" + checkg_oy = "" + checkg_vv_sub = "TRUE" + grp_by = "" + + if grouping_cols is not None: + + # We use actual table names in some cases and aliases in others + # In some cases, we swap the table names so use of an alias is + # necessary. In other cases, they are used to simplify debugging. + + comma_grp = " , " + grouping_cols + comma_grp_e = " , " + _grp_from_table("edge", glist) + comma_grp_m = " , " + _grp_from_table(message, glist) + grp_comma = grouping_cols + " , " + grp_v1_comma = _grp_from_table("v1", glist) + " , " + grp_o1_comma = _grp_from_table(out_table_1, glist) + " , " + grp_o_comma = _grp_from_table("out", glist) + " , " + + checkg_eo = " AND " + _check_groups(edge_table, out_table, glist) + checkg_eout = " AND " + _check_groups("edge", "out", glist) + checkg_ex = " AND " + _check_groups("edge", "x", glist) + checkg_om = " AND " + _check_groups(out_table, message, glist) + checkg_o1t_sub = _check_groups("out", tmp_view, glist) + checkg_ot_sub = _check_groups(out_table, tmp_view, glist) + checkg_ot = " AND " + _check_groups(out_table, tmp_view, glist) + checkg_o1t = " AND " + _check_groups("out", "t", glist) + checkg_vv = " AND " + _check_groups("v1", "v2", glist) + checkg_o2v = " AND " + _check_groups(out_table_2, "v2", glist) + checkg_oy = " AND " + _check_groups("out", "y", glist) + checkg_vv_sub = _check_groups("v1", "v2", glist) + grp_by = " GROUP BY " + grouping_cols + + w_type = get_expr_type(weight, edge_table).lower() + init_w = INT_MAX + if w_type in ['real', 'double precision', 'float8']: + init_w = INFINITY + + # We keep a summary table to keep track of the parameters used for this + # APSP run. This table is used in the path finding function to eliminate + # the need for repetition. + plpy.execute(""" CREATE TABLE {out_table}_summary ( + vertex_table TEXT, + vertex_id TEXT, + edge_table TEXT, + edge_args TEXT, + out_table TEXT, + grouping_cols TEXT) + """.format(**locals())) + plpy.execute(""" INSERT INTO {out_table}_summary VALUES + ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}', + '{out_table}', '{g_st}') """.format(**locals())) + + plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) + + # Find all of the vertices involved with a given group + plpy.execute(""" CREATE VIEW {tmp_view} AS + SELECT {src} AS {vertex_id} {comma_grp} + FROM {edge_table} WHERE {src} IS NOT NULL + UNION + SELECT {dest} AS {vertex_id} {comma_grp} + FROM {edge_table} WHERE {dest} IS NOT NULL + """.format(**locals())) + + # Don't use the unnecessary rows of the output table during joins. + ot_sql = """ SELECT * FROM {out_table} + WHERE {weight} != {init_w} AND {src} != {dest} """ + + # HAWQ does not support UPDATE so the initialization has to differ. + if is_hawq: + + plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format( + out_table_1, out_table_2)) + # Create 2 identical tables to swap at every iteration. + plpy.execute(""" CREATE TABLE {out_table_1} AS + SELECT {grp_comma} {src},{dest},{weight}, NULL::INT AS parent + FROM {edge_table} LIMIT 0 {distribution} + """.format(**locals())) + plpy.execute(""" CREATE TABLE {out_table_2} AS + SELECT * FROM {out_table_1} LIMIT 0 {distribution} + """.format(**locals())) + + # The source can be reached with 0 cost and next is itself. + plpy.execute(""" INSERT INTO {out_table_2} + SELECT {grp_comma} {vertex_id} AS {src}, {vertex_id} AS {dest}, + 0 AS {weight}, {vertex_id} AS parent + FROM {tmp_view} """.format(**locals())) + # Distance = 1: every edge means there is a path from src to dest + plpy.execute(""" INSERT INTO {out_table_2} + SELECT {grp_comma} {src}, {dest}, {weight}, {dest} AS parent + FROM {edge_table} """.format(**locals())) + + # Fill the rest of the possible pairs with infinite initial weights + fill_sql = """ INSERT INTO {out_table_1} + SELECT {grp_v1_comma} + v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest}, + {init_w} AS {weight}, NULL::INT AS parent + FROM {tmp_view} v1, {tmp_view} v2 + WHERE NOT EXISTS + (SELECT 1 FROM {out_table_2} + WHERE v1.{vertex_id} = {src} AND + v2.{vertex_id} = {dest} + {checkg_vv} {checkg_o2v}) + {checkg_vv} + UNION + SELECT * FROM {out_table_2} + """.format(**locals()) + plpy.execute(fill_sql) + + ot_sql1 = ot_sql.format(out_table=out_table_1, init_w=init_w, + weight=weight, src=src, dest=dest) + ot_sql2 = ot_sql.format(out_table=out_table_2, init_w=init_w, + weight=weight, src=src, dest=dest) + + # PostgreSQL & GPDB initialization + else: + + plpy.execute(""" CREATE TABLE {out_table} AS + (SELECT {grp_comma} {src}, {dest}, {weight}, + {src} AS parent FROM {edge_table} LIMIT 0) + {distribution} """.format(**locals())) + + plpy.execute(""" INSERT INTO {out_table} + SELECT {grp_v1_comma} + v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest}, + {init_w} AS {weight}, NULL::INT AS parent + FROM + {tmp_view} AS v1 INNER JOIN + {tmp_view} AS v2 ON ({checkg_vv_sub}) + """.format(**locals())) + plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) + + # GPDB and HAWQ have distributed by clauses to help them with indexing. + # For Postgres we add the indices manually. + if is_platform_pg(): + sql_index = "CREATE INDEX ON {0}({1})".format(out_table, src) + else: + sql_index = '' + + plpy.execute(sql_index) + + # The source can be reached with 0 cost and next is itself. + plpy.execute( + """ UPDATE {out_table} SET + {weight} = 0, parent = {vertex_id} + FROM {vertex_table} + WHERE {out_table}.{src} = {vertex_id} + AND {out_table}.{dest} = {vertex_id} + """.format(**locals())) + + # Distance = 1: every edge means there is a path from src to dest + + # There may be multiple edges defined as a->b, + # we only need the minimum weighted one. + + plpy.execute( + """ CREATE VIEW {tmp_view} AS + SELECT {grp_comma} {src}, {dest}, + min({weight}) AS {weight} + FROM {edge_table} + GROUP BY {grp_comma} {src}, {dest} + """.format(**locals())) + plpy.execute( + """ UPDATE {out_table} SET + {weight} = {tmp_view}.{weight}, parent = {tmp_view}.{dest} + FROM {tmp_view} + WHERE {out_table}.{src} = {tmp_view}.{src} + AND {out_table}.{dest} = {tmp_view}.{dest} + AND {out_table}.{weight} > {tmp_view}.{weight} {checkg_ot} + """.format(**locals())) + plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) + + ot_sql1 = ot_sql.format(**locals()) + out_table_1 = out_table + + # Find the maximum number of iterations to try + # If not done by v_cnt iterations, there is a negative cycle. + v_cnt = plpy.execute( + """ SELECT max(count) as max FROM ( + SELECT count(DISTINCT {src}) AS count + FROM {out_table_1} + {grp_by}) x + """.format(**locals()))[0]['max'] + + if v_cnt < 2: + plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) + plpy.execute("DROP TABLE IF EXISTS {0},{1}". + format(out_table, out_table + "_summary")) + if is_hawq: + plpy.execute("DROP TABLE IF EXISTS {0},{1}".format( + out_table_1, out_table_2)) + if grouping_cols: + plpy.error(("Graph APSP: {0} has less than 2 vertices in " + + "every group.").format(edge_table)) + else: + plpy.error("Graph APSP: {0} has less than 2 vertices.".format( + vertex_table)) + + for i in range(0, v_cnt + 1): + + """ + Create a view that will be used to update the output table + + The implementation is based on the matrix multiplication idea. + The initial output table consists of 3 sets of vertex pairs. + 1) for every vervex v: v -> v, weight 0, parent v + 2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2 + 3) for every other vertex pair v1,v2: v1 -> v2, weight 'Inf', + parent NULL + + The algorithm "relaxes" the paths: finds alternate paths with less + weights + At every step, we look at every combination of non-infinite + existing paths and edges to see if we can relax a path. + + Assume the graph is a chain: 1->2->3->... + The initial output table will have a finite weighted path for 1->2 + and infinite for the rest. At ith iteration, the output table will + have 1->i path and will relax 1->i+1 path from infinite to a + finite value (weight of 1->i path + weight of i->i+1 edge) and + assign i as the parent. + + Since using '=' with floats is dangerous we use an epsilon value + for comparison. + """ + + plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) + update_sql = """ CREATE VIEW {tmp_view} AS + SELECT DISTINCT ON ({grp_o_comma} y.{src}, y.{dest}) + {grp_o_comma} y.{src}, y.{dest},y.{weight}, y.parent + FROM {out_table_1} AS out + INNER JOIN + (SELECT x.{src}, x.{dest}, x.{weight}, + out.{dest} as parent {comma_grp_e} + FROM + ({ot_sql1}) AS out + INNER JOIN + {edge_table} AS edge + ON (out.{dest} = edge.{src} {checkg_eout}) + INNER JOIN + (SELECT out.{src}, edge.{dest}, + min(out.{weight}+edge.{weight}) AS {weight} + {comma_grp_e} + FROM + ({ot_sql1}) AS out, + {edge_table} AS edge + WHERE out.{dest} = edge.{src} {checkg_eout} + GROUP BY out.{src},edge.{dest} {comma_grp_e}) x + ON (x.{src} = out.{src} AND x.{dest} = edge.{dest} {checkg_ex}) + WHERE ABS(out.{weight}+edge.{weight} - x.{weight}) + < {EPSILON}) y + ON (y.{src} = out.{src} AND y.{dest} = out.{dest} {checkg_oy}) + WHERE y.{weight} < out.{weight} + """.format(**locals()) + plpy.execute(update_sql) + + # HAWQ employs alternating tables and has to be handled separately + if is_hawq: + + # Stop if therea re no more updates + if table_is_empty(tmp_view): + + plpy.execute("DROP VIEW {0}".format(tmp_view)) + plpy.execute("ALTER TABLE {0} RENAME TO {1}".format( + out_table_1, out_table)) + break + + # The new output table will still have the old values for + # every vertex pair that does not appear on the update view. + + plpy.execute("TRUNCATE TABLE {0}".format(out_table_2)) + fill_sql = """ INSERT INTO {out_table_2} + SELECT * FROM {out_table_1} AS out WHERE NOT EXISTS + (SELECT 1 FROM {tmp_view} AS t + WHERE t.{src} = out.{src} AND + t.{dest} = out.{dest} {checkg_o1t}) + UNION + SELECT * FROM {tmp_view}""".format(**locals()) + plpy.execute(fill_sql) + + # Swap the table names and the sql command we use for filtering + tmpname = out_table_1 + out_table_1 = out_table_2 + out_table_2 = tmpname + + tmpname = ot_sql1 + ot_sql1 = ot_sql2 + ot_sql2 = tmpname + + else: + + updates = plpy.execute( + """ UPDATE {out_table} + SET {weight} = {tmp_view}.{weight}, + parent = {tmp_view}.parent + FROM {tmp_view} + WHERE {tmp_view}.{src} = {out_table}.{src} AND + {tmp_view}.{dest} = {out_table}.{dest} {checkg_ot} + """.format(**locals())) + if updates.nrows() == 0: + break + + # The algorithm should have reached a break command by this point. + # This check handles the existence of a negative cycle. + if i == v_cnt: + + # If there are no groups, clean up and give error. + if grouping_cols is None: + plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) + plpy.execute("DROP TABLE IF EXISTS {0},{1}". + format(out_table, out_table + "_summary")) + if is_hawq: + plpy.execute("DROP TABLE IF EXISTS {0},{1}".format( + out_table_1, out_table_2)) + plpy.error("Graph APSP: Detected a negative cycle in the graph.") + + # It is possible that not all groups has negative cycles. + else: + # negs is the string created by collating grouping columns. + # By looking at the update view, we can see which groups + # are in a negative cycle. + + negs = plpy.execute( + """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp + FROM {tmp_view} + """.format(**locals()))[0]['grp'] + + # Delete the groups with negative cycles from the output table. + + # HAWQ doesn't support DELETE so we have to copy the valid results. + if is_hawq: + sql_del = """ CREATE TABLE {out_table} AS + SELECT * + FROM {out_table_1} AS out + WHERE NOT EXISTS( + SELECT 1 + FROM {tmp_view} + WHERE {checkg_o1t_sub} + );""" + plpy.execute(sql_del.format(**locals())) + plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) + plpy.execute("DROP TABLE IF EXISTS {0},{1}".format( + out_table_1, out_table_2)) + else: + sql_del = """ DELETE FROM {out_table} + USING {tmp_view} + WHERE {checkg_ot_sub}""" + plpy.execute(sql_del.format(**locals())) + + plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) + + # If every group has a negative cycle, + # drop the output table as well. + if table_is_empty(out_table): + plpy.execute("DROP TABLE IF EXISTS {0},{1}". + format(out_table, out_table + "_summary")) + plpy.warning( + """Graph APSP: Detected a negative cycle in the """ + + """sub-graphs of following groups: {0}.""". + format(str(negs)[1:-1])) + + else: + plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) + if is_hawq: + plpy.execute("DROP TABLE IF EXISTS {0}".format(out_table_2)) + + return None - with MinWarning("warning"): - - INT_MAX = 2147483647 - INFINITY = "'Infinity'" - EPSILON = 0.000001 - - params_types = {'src': str, 'dest': str, 'weight': str} - default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'} - edge_params = extract_keyvalue_params(edge_args, - params_types, - default_args) - - # Prepare the input for recording in the summary table - if vertex_id is None: - v_st= "NULL" - vertex_id = "id" - else: - v_st = vertex_id - if edge_args is None: - e_st = "NULL" - else: - e_st = edge_args - if grouping_cols is None: - g_st = "NULL" - glist = None - else: - g_st = grouping_cols - glist = split_quoted_delimited_str(grouping_cols) - - src = edge_params["src"] - dest = edge_params["dest"] - weight = edge_params["weight"] - - distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, - <!"DISTRIBUTED BY ({0})".format(src)!>) - - is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>) - - _validate_apsp(vertex_table, vertex_id, edge_table, - edge_params, out_table, glist) - - out_table_1 = unique_string(desp='out_table_1') - out_table_2 = unique_string(desp='out_table_2') - tmp_view = unique_string(desp='tmp_view') - v1 = unique_string(desp='v1') - v2 = unique_string(desp='v2') - message = unique_string(desp='message') - - # Initialize grouping related variables - comma_grp = "" - comma_grp_e = "" - comma_grp_m = "" - grp_comma = "" - grp_v1_comma = "" - grp_o1_comma = "" - grp_o_comma = "" - checkg_eo = "" - checkg_eout = "" - checkg_ex = "" - checkg_om = "" - checkg_o1t_sub = "" - checkg_ot_sub = "" - checkg_ot = "" - checkg_o1t = "" - checkg_vv = "" - checkg_o2v = "" - checkg_oy = "" - checkg_vv_sub = "TRUE" - grp_by = "" - - if grouping_cols is not None: - - # We use actual table names in some cases and aliases in others - # In some cases, we swap the table names so use of an alias is - # necessary. In other cases, they are used to simplify debugging. - - comma_grp = " , " + grouping_cols - comma_grp_e = " , " + _grp_from_table("edge",glist) - comma_grp_m = " , " + _grp_from_table(message,glist) - grp_comma = grouping_cols + " , " - grp_v1_comma = _grp_from_table("v1",glist) + " , " - grp_o1_comma = _grp_from_table(out_table_1,glist) + " , " - grp_o_comma = _grp_from_table("out",glist) + " , " - - checkg_eo = " AND " + _check_groups(edge_table,out_table,glist) - checkg_eout = " AND " + _check_groups("edge","out",glist) - checkg_ex = " AND " + _check_groups("edge","x",glist) - checkg_om = " AND " + _check_groups(out_table,message,glist) - checkg_o1t_sub = _check_groups("out",tmp_view,glist) - checkg_ot_sub = _check_groups(out_table,tmp_view,glist) - checkg_ot = " AND " + _check_groups(out_table,tmp_view,glist) - checkg_o1t = " AND " + _check_groups("out","t",glist) - checkg_vv = " AND " + _check_groups("v1","v2",glist) - checkg_o2v = " AND " + _check_groups(out_table_2,"v2",glist) - checkg_oy = " AND " + _check_groups("out","y",glist) - checkg_vv_sub = _check_groups("v1","v2",glist) - grp_by = " GROUP BY " + grouping_cols - - w_type = get_expr_type(weight,edge_table).lower() - init_w = INT_MAX - if w_type in ['real','double precision','float8']: - init_w = INFINITY - - # We keep a summary table to keep track of the parameters used for this - # APSP run. This table is used in the path finding function to eliminate - # the need for repetition. - plpy.execute( """ CREATE TABLE {out_table}_summary ( - vertex_table TEXT, - vertex_id TEXT, - edge_table TEXT, - edge_args TEXT, - out_table TEXT, - grouping_cols TEXT) - """.format(**locals())) - plpy.execute( """ INSERT INTO {out_table}_summary VALUES - ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}', - '{out_table}', '{g_st}') """.format(**locals())) - - plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) - - # Find all of the vertices involved with a given group - plpy.execute(""" CREATE VIEW {tmp_view} AS - SELECT {src} AS {vertex_id} {comma_grp} - FROM {edge_table} WHERE {src} IS NOT NULL - UNION - SELECT {dest} AS {vertex_id} {comma_grp} - FROM {edge_table} WHERE {dest} IS NOT NULL - """.format(**locals())) - - # Don't use the unnecessary rows of the output table during joins. - ot_sql = """ SELECT * FROM {out_table} - WHERE {weight} != {init_w} AND {src} != {dest} """ - - # HAWQ does not support UPDATE so the initialization has to differ. - if is_hawq: - - plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format( - out_table_1,out_table_2)) - # Create 2 identical tables to swap at every iteration. - plpy.execute(""" CREATE TABLE {out_table_1} AS - SELECT {grp_comma} {src},{dest},{weight}, NULL::INT AS parent - FROM {edge_table} LIMIT 0 {distribution} - """.format(**locals())) - plpy.execute(""" CREATE TABLE {out_table_2} AS - SELECT * FROM {out_table_1} LIMIT 0 {distribution} - """.format(**locals())) - - # The source can be reached with 0 cost and next is itself. - plpy.execute(""" INSERT INTO {out_table_2} - SELECT {grp_comma} {vertex_id} AS {src}, {vertex_id} AS {dest}, - 0 AS {weight}, {vertex_id} AS parent - FROM {tmp_view} """.format(**locals())) - # Distance = 1: every edge means there is a path from src to dest - plpy.execute(""" INSERT INTO {out_table_2} - SELECT {grp_comma} {src}, {dest}, {weight}, {dest} AS parent - FROM {edge_table} """.format(**locals())) - - # Fill the rest of the possible pairs with infinite initial weights - fill_sql = """ INSERT INTO {out_table_1} - SELECT {grp_v1_comma} - v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest}, - {init_w} AS {weight}, NULL::INT AS parent - FROM {tmp_view} v1, {tmp_view} v2 - WHERE NOT EXISTS - (SELECT 1 FROM {out_table_2} - WHERE v1.{vertex_id} = {src} AND - v2.{vertex_id} = {dest} - {checkg_vv} {checkg_o2v}) - {checkg_vv} - UNION - SELECT * FROM {out_table_2} - """.format(**locals()) - plpy.execute(fill_sql) - - ot_sql1 = ot_sql.format(out_table = out_table_1, init_w = init_w, - weight = weight, src = src, dest = dest) - ot_sql2 = ot_sql.format(out_table = out_table_2, init_w = init_w, - weight = weight, src = src, dest = dest) - - # PostgreSQL & GPDB initialization - else: - - plpy.execute( """ CREATE TABLE {out_table} AS - (SELECT {grp_comma} {src}, {dest}, {weight}, - {src} AS parent FROM {edge_table} LIMIT 0) - {distribution} """.format(**locals())) - - plpy.execute( """ INSERT INTO {out_table} - SELECT {grp_v1_comma} - v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest}, - {init_w} AS {weight}, NULL::INT AS parent - FROM - {tmp_view} AS v1 INNER JOIN - {tmp_view} AS v2 ON ({checkg_vv_sub}) - """.format(**locals())) - plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) - - # GPDB and HAWQ have distributed by clauses to help them with indexing. - # For Postgres we add the indices manually. - sql_index = m4_ifdef(<!__POSTGRESQL__!>, - <!""" CREATE INDEX ON {out_table} ({src}); - """.format(**locals())!>, <!''!>) - plpy.execute(sql_index) - - # The source can be reached with 0 cost and next is itself. - plpy.execute( - """ UPDATE {out_table} SET - {weight} = 0, parent = {vertex_id} - FROM {vertex_table} - WHERE {out_table}.{src} = {vertex_id} - AND {out_table}.{dest} = {vertex_id} - """.format(**locals())) - - # Distance = 1: every edge means there is a path from src to dest - - # There may be multiple edges defined as a->b, - # we only need the minimum weighted one. - - plpy.execute( - """ CREATE VIEW {tmp_view} AS - SELECT {grp_comma} {src}, {dest}, - min({weight}) AS {weight} - FROM {edge_table} - GROUP BY {grp_comma} {src}, {dest} - """.format(**locals())) - plpy.execute( - """ UPDATE {out_table} SET - {weight} = {tmp_view}.{weight}, parent = {tmp_view}.{dest} - FROM {tmp_view} - WHERE {out_table}.{src} = {tmp_view}.{src} - AND {out_table}.{dest} = {tmp_view}.{dest} - AND {out_table}.{weight} > {tmp_view}.{weight} {checkg_ot} - """.format(**locals())) - plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) - - ot_sql1 = ot_sql.format(**locals()) - out_table_1 = out_table - - # Find the maximum number of iterations to try - # If not done by v_cnt iterations, there is a negative cycle. - v_cnt = plpy.execute( - """ SELECT max(count) as max FROM ( - SELECT count(DISTINCT {src}) AS count - FROM {out_table_1} - {grp_by}) x - """.format(**locals()))[0]['max'] - - if v_cnt < 2: - plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) - plpy.execute("DROP TABLE IF EXISTS {0},{1}". - format(out_table, out_table+"_summary")) - if is_hawq: - plpy.execute("DROP TABLE IF EXISTS {0},{1}".format( - out_table_1,out_table_2)) - if grouping_cols: - plpy.error(("Graph APSP: {0} has less than 2 vertices in "+ - "every group.").format(edge_table)) - else: - plpy.error("Graph APSP: {0} has less than 2 vertices.".format( - vertex_table)) - - for i in range(0,v_cnt+1): - - """ - Create a view that will be used to update the output table - - The implementation is based on the matrix multiplication idea. - The initial output table consists of 3 sets of vertex pairs. - 1) for every vervex v: v -> v, weight 0, parent v - 2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2 - 3) for every other vertex pair v1,v2: v1 -> v2, weight 'Inf', - parent NULL - - The algorithm "relaxes" the paths: finds alternate paths with less - weights - At every step, we look at every combination of non-infinite - existing paths and edges to see if we can relax a path. - - Assume the graph is a chain: 1->2->3->... - The initial output table will have a finite weighted path for 1->2 - and infinite for the rest. At ith iteration, the output table will - have 1->i path and will relax 1->i+1 path from infinite to a - finite value (weight of 1->i path + weight of i->i+1 edge) and - assign i as the parent. - - Since using '=' with floats is dangerous we use an epsilon value - for comparison. - """ - - plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) - update_sql = """ CREATE VIEW {tmp_view} AS - SELECT DISTINCT ON ({grp_o_comma} y.{src}, y.{dest}) - {grp_o_comma} y.{src}, y.{dest},y.{weight}, y.parent - FROM {out_table_1} AS out - INNER JOIN - (SELECT x.{src}, x.{dest}, x.{weight}, - out.{dest} as parent {comma_grp_e} - FROM - ({ot_sql1}) AS out - INNER JOIN - {edge_table} AS edge - ON (out.{dest} = edge.{src} {checkg_eout}) - INNER JOIN - (SELECT out.{src}, edge.{dest}, - min(out.{weight}+edge.{weight}) AS {weight} - {comma_grp_e} - FROM - ({ot_sql1}) AS out, - {edge_table} AS edge - WHERE out.{dest} = edge.{src} {checkg_eout} - GROUP BY out.{src},edge.{dest} {comma_grp_e}) x - ON (x.{src} = out.{src} AND x.{dest} = edge.{dest} {checkg_ex}) - WHERE ABS(out.{weight}+edge.{weight} - x.{weight}) - < {EPSILON}) y - ON (y.{src} = out.{src} AND y.{dest} = out.{dest} {checkg_oy}) - WHERE y.{weight} < out.{weight} - """.format(**locals()) - plpy.execute(update_sql) - - # HAWQ employs alternating tables and has to be handled separately - if is_hawq: - - # Stop if therea re no more updates - if table_is_empty(tmp_view): - - plpy.execute("DROP VIEW {0}".format(tmp_view)) - plpy.execute("ALTER TABLE {0} RENAME TO {1}".format( - out_table_1,out_table)) - break - - # The new output table will still have the old values for - # every vertex pair that does not appear on the update view. - - plpy.execute("TRUNCATE TABLE {0}".format(out_table_2)) - fill_sql = """ INSERT INTO {out_table_2} - SELECT * FROM {out_table_1} AS out WHERE NOT EXISTS - (SELECT 1 FROM {tmp_view} AS t - WHERE t.{src} = out.{src} AND - t.{dest} = out.{dest} {checkg_o1t}) - UNION - SELECT * FROM {tmp_view}""".format(**locals()) - plpy.execute(fill_sql) - - # Swap the table names and the sql command we use for filtering - tmpname = out_table_1 - out_table_1 = out_table_2 - out_table_2 = tmpname - - tmpname = ot_sql1 - ot_sql1 = ot_sql2 - ot_sql2 = tmpname - - else: - - updates = plpy.execute( - """ UPDATE {out_table} - SET {weight} = {tmp_view}.{weight}, - parent = {tmp_view}.parent - FROM {tmp_view} - WHERE {tmp_view}.{src} = {out_table}.{src} AND - {tmp_view}.{dest} = {out_table}.{dest} {checkg_ot} - """.format(**locals())) - if updates.nrows() == 0: - break - - # The algorithm should have reached a break command by this point. - # This check handles the existence of a negative cycle. - if i == v_cnt: - - # If there are no groups, clean up and give error. - if grouping_cols is None: - plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) - plpy.execute("DROP TABLE IF EXISTS {0},{1}". - format(out_table, out_table+"_summary")) - if is_hawq: - plpy.execute("DROP TABLE IF EXISTS {0},{1}".format( - out_table_1,out_table_2)) - plpy.error("Graph APSP: Detected a negative cycle in the graph.") - - # It is possible that not all groups has negative cycles. - else: - # negs is the string created by collating grouping columns. - # By looking at the update view, we can see which groups - # are in a negative cycle. - - negs = plpy.execute( - """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp - FROM {tmp_view} - """.format(**locals()))[0]['grp'] - - # Delete the groups with negative cycles from the output table. - - # HAWQ doesn't support DELETE so we have to copy the valid results. - if is_hawq: - sql_del = """ CREATE TABLE {out_table} AS - SELECT * - FROM {out_table_1} AS out - WHERE NOT EXISTS( - SELECT 1 - FROM {tmp_view} - WHERE {checkg_o1t_sub} - );""" - plpy.execute(sql_del.format(**locals())) - plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) - plpy.execute("DROP TABLE IF EXISTS {0},{1}".format( - out_table_1,out_table_2)) - else: - sql_del = """ DELETE FROM {out_table} - USING {tmp_view} - WHERE {checkg_ot_sub}""" - plpy.execute(sql_del.format(**locals())) - - plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) - - # If every group has a negative cycle, - # drop the output table as well. - if table_is_empty(out_table): - plpy.execute("DROP TABLE IF EXISTS {0},{1}". - format(out_table,out_table+"_summary")) - plpy.warning( - """Graph APSP: Detected a negative cycle in the """ + - """sub-graphs of following groups: {0}.""". - format(str(negs)[1:-1])) - - else: - plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view)) - if is_hawq: - plpy.execute("DROP TABLE IF EXISTS {0}".format(out_table_2)) - - return None def graph_apsp_get_path(schema_madlib, apsp_table, - source_vertex, dest_vertex, path_table, **kwargs): - """ - Helper function that can be used to get the shortest path between any 2 - vertices + source_vertex, dest_vertex, path_table, **kwargs): + """ + Helper function that can be used to get the shortest path between any 2 + vertices Args: - @param apsp_table Name of the table that contains the APSP - output. - @param source_vertex The vertex that will be the source of the - desired path. - @param dest_vertex The vertex that will be the destination of the - desired path. - """ - - with MinWarning("warning"): - _validate_get_path(apsp_table, source_vertex, dest_vertex, path_table) - - temp1_name = unique_string(desp='temp1') - temp2_name = unique_string(desp='temp2') - - summary = plpy.execute("SELECT * FROM {0}_summary".format(apsp_table)) - vertex_id = summary[0]['vertex_id'] - edge_args = summary[0]['edge_args'] - grouping_cols = summary[0]['grouping_cols'] - - params_types = {'src': str, 'dest': str, 'weight': str} - default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'} - edge_params = extract_keyvalue_params(edge_args, - params_types, - default_args) - - src = edge_params["src"] - dest = edge_params["dest"] - weight = edge_params["weight"] - - if vertex_id == "NULL": - vertex_id = "id" - - if grouping_cols == "NULL": - grouping_cols = None - - select_grps = "" - check_grps_t1 = "" - check_grps_t2 = "" - grp_comma = "" - tmp = "" - - if grouping_cols is not None: - glist = split_quoted_delimited_str(grouping_cols) - select_grps = _grp_from_table(apsp_table,glist) + " , " - check_grps_t1 = " AND " + _check_groups( - apsp_table,temp1_name,glist) - check_grps_t2 = " AND " + _check_groups( - apsp_table,temp2_name,glist) - - grp_comma = grouping_cols + " , " - - # If the source and destination is the same vertex. - # There is no need to check the paths for any group. - if source_vertex == dest_vertex: - plpy.execute(""" - CREATE TABLE {path_table} AS - SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path - FROM {apsp_table} WHERE {src} = {source_vertex} AND - {dest} = {dest_vertex} - """.format(**locals())) - return - - plpy.execute( "DROP TABLE IF EXISTS {0},{1}". - format(temp1_name,temp2_name)); - - # Initialize the temporary tables - out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS - SELECT {grp_comma} {apsp_table}.parent AS {vertex_id}, - ARRAY[{dest_vertex}] AS path - FROM {apsp_table} - WHERE {src} = {source_vertex} AND {dest} = {dest_vertex} - AND {apsp_table}.parent IS NOT NULL - """.format(**locals())) - - plpy.execute(""" - CREATE TEMP TABLE {temp2_name} AS - SELECT * FROM {temp1_name} LIMIT 0 - """.format(**locals())) - - # Follow the 'parent' chain until you reach the case where the parent - # is the same as destination. This means it is the last vertex before - # the source. - while out.nrows() > 0: - - plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals())) - # If the parent id is not the same as dest, - # that means we have to follow the chain: - # Add it to the path and move to its parent - out = plpy.execute( - """ INSERT INTO {temp2_name} - SELECT {select_grps} {apsp_table}.parent AS {vertex_id}, - {apsp_table}.{dest} || {temp1_name}.path AS path - FROM {apsp_table} INNER JOIN {temp1_name} ON - ({apsp_table}.{dest} = {temp1_name}.{vertex_id} - {check_grps_t1}) - WHERE {src} = {source_vertex} AND - {apsp_table}.parent <> {apsp_table}.{dest} - """.format(**locals())) - - tmp = temp2_name - temp2_name = temp1_name - temp1_name = tmp - - tmp = check_grps_t1 - check_grps_t1 = check_grps_t2 - check_grps_t2 = tmp - - # We have to consider 3 cases. - # 1) The path has more than 2 vertices: - # Add the current parent and the source vertex - # 2) The path has exactly 2 vertices (an edge between src and dest is - # the shortest path). - # Add the source vertex - # 3) The path has 0 vertices (unreachable) - # Add an empty array. - - # Path with 1 vertex (src == dest) has been handled before - plpy.execute(""" - CREATE TABLE {path_table} AS - SELECT {grp_comma} {source_vertex} || ({vertex_id} || path) AS path - FROM {temp2_name} - WHERE {vertex_id} <> {dest_vertex} - UNION - SELECT {grp_comma} {source_vertex} || path AS path - FROM {temp2_name} - WHERE {vertex_id} = {dest_vertex} - UNION - SELECT {grp_comma} '{{}}'::INT[] AS path - FROM {apsp_table} - WHERE {src} = {source_vertex} AND {dest} = {dest_vertex} - AND {apsp_table}.parent IS NULL - """.format(**locals())) - - out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table)) - - if out.nrows() == 0: - plpy.error( ("Graph APSP: Vertex {0} and/or {1} is not present"+ - " in the APSP table {1}").format( - source_vertex,dest_vertex,apsp_table)) - - plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}". - format(**locals())) - - return None + @param apsp_table Name of the table that contains the APSP + output. + @param source_vertex The vertex that will be the source of the + desired path. + @param dest_vertex The vertex that will be the destination of the + desired path. + """ + + with MinWarning("warning"): + _validate_get_path(apsp_table, source_vertex, dest_vertex, path_table) + + temp1_name = unique_string(desp='temp1') + temp2_name = unique_string(desp='temp2') + + summary = plpy.execute("SELECT * FROM {0}_summary".format(apsp_table)) + vertex_id = summary[0]['vertex_id'] + edge_args = summary[0]['edge_args'] + grouping_cols = summary[0]['grouping_cols'] + + params_types = {'src': str, 'dest': str, 'weight': str} + default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'} + edge_params = extract_keyvalue_params(edge_args, + params_types, + default_args) + + src = edge_params["src"] + dest = edge_params["dest"] + weight = edge_params["weight"] + + if vertex_id == "NULL": + vertex_id = "id" + + if grouping_cols == "NULL": + grouping_cols = None + + select_grps = "" + check_grps_t1 = "" + check_grps_t2 = "" + grp_comma = "" + tmp = "" + + if grouping_cols is not None: + glist = split_quoted_delimited_str(grouping_cols) + select_grps = _grp_from_table(apsp_table, glist) + " , " + check_grps_t1 = " AND " + _check_groups( + apsp_table, temp1_name, glist) + check_grps_t2 = " AND " + _check_groups( + apsp_table, temp2_name, glist) + + grp_comma = grouping_cols + " , " + + # If the source and destination is the same vertex. + # There is no need to check the paths for any group. + if source_vertex == dest_vertex: + plpy.execute(""" + CREATE TABLE {path_table} AS + SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path + FROM {apsp_table} WHERE {src} = {source_vertex} AND + {dest} = {dest_vertex} + """.format(**locals())) + return + + plpy.execute("DROP TABLE IF EXISTS {0},{1}". + format(temp1_name, temp2_name)) + + # Initialize the temporary tables + out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS + SELECT {grp_comma} {apsp_table}.parent AS {vertex_id}, + ARRAY[{dest_vertex}] AS path + FROM {apsp_table} + WHERE {src} = {source_vertex} AND {dest} = {dest_vertex} + AND {apsp_table}.parent IS NOT NULL + """.format(**locals())) + + plpy.execute(""" + CREATE TEMP TABLE {temp2_name} AS + SELECT * FROM {temp1_name} LIMIT 0 + """.format(**locals())) + + # Follow the 'parent' chain until you reach the case where the parent + # is the same as destination. This means it is the last vertex before + # the source. + while out.nrows() > 0: + + plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals())) + # If the parent id is not the same as dest, + # that means we have to follow the chain: + # Add it to the path and move to its parent + out = plpy.execute( + """ INSERT INTO {temp2_name} + SELECT {select_grps} {apsp_table}.parent AS {vertex_id}, + {apsp_table}.{dest} || {temp1_name}.path AS path + FROM {apsp_table} INNER JOIN {temp1_name} ON + ({apsp_table}.{dest} = {temp1_name}.{vertex_id} + {check_grps_t1}) + WHERE {src} = {source_vertex} AND + {apsp_table}.parent <> {apsp_table}.{dest} + """.format(**locals())) + + tmp = temp2_name + temp2_name = temp1_name + temp1_name = tmp + + tmp = check_grps_t1 + check_grps_t1 = check_grps_t2 + check_grps_t2 = tmp + + # We have to consider 3 cases. + # 1) The path has more than 2 vertices: + # Add the current parent and the source vertex + # 2) The path has exactly 2 vertices (an edge between src and dest is + # the shortest path). + # Add the source vertex + # 3) The path has 0 vertices (unreachable) + # Add an empty array. + + # Path with 1 vertex (src == dest) has been handled before + plpy.execute(""" + CREATE TABLE {path_table} AS + SELECT {grp_comma} {source_vertex} || ({vertex_id} || path) AS path + FROM {temp2_name} + WHERE {vertex_id} <> {dest_vertex} + UNION + SELECT {grp_comma} {source_vertex} || path AS path + FROM {temp2_name} + WHERE {vertex_id} = {dest_vertex} + UNION + SELECT {grp_comma} '{{}}'::INT[] AS path + FROM {apsp_table} + WHERE {src} = {source_vertex} AND {dest} = {dest_vertex} + AND {apsp_table}.parent IS NULL + """.format(**locals())) + + out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table)) + + if out.nrows() == 0: + plpy.error(("Graph APSP: Vertex {0} and/or {1} is not present" + + " in the APSP table {1}").format( + source_vertex, dest_vertex, apsp_table)) + + plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}". + format(**locals())) + + return None + def _validate_apsp(vertex_table, vertex_id, edge_table, edge_params, - out_table, glist, **kwargs): + out_table, glist, **kwargs): + + validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, + out_table, 'APSP') - validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, - out_table,'APSP') + vt_error = plpy.execute( + """ SELECT {vertex_id} + FROM {vertex_table} + WHERE {vertex_id} IS NOT NULL + GROUP BY {vertex_id} + HAVING count(*) > 1 """.format(**locals())) - vt_error = plpy.execute( - """ SELECT {vertex_id} - FROM {vertex_table} - WHERE {vertex_id} IS NOT NULL - GROUP BY {vertex_id} - HAVING count(*) > 1 """.format(**locals())) + if vt_error.nrows() != 0: + plpy.error( + """Graph APSP: Source vertex table {vertex_table} contains duplicate vertex id's.""". + format(**locals())) - if vt_error.nrows() != 0: - plpy.error( - """Graph APSP: Source vertex table {vertex_table} contains duplicate vertex id's.""". - format(**locals())) + _assert(not table_exists(out_table + "_summary"), + "Graph APSP: Output summary table already exists!") - _assert(not table_exists(out_table+"_summary"), - "Graph APSP: Output summary table already exists!") + if glist is not None: + _assert(columns_exist_in_table(edge_table, glist), + """Graph APSP: Not all columns from {glist} are present in edge table ({edge_table}).""". + format(**locals())) - if glist is not None: - _assert(columns_exist_in_table(edge_table, glist), - """Graph APSP: Not all columns from {glist} are present in edge table ({edge_table}).""". - format(**locals())) def _validate_get_path(apsp_table, source_vertex, dest_vertex, - path_table, **kwargs): + path_table, **kwargs): - _assert(apsp_table and apsp_table.strip().lower() not in ('null', ''), - "Graph APSP: Invalid APSP table name!") - _assert(table_exists(apsp_table), - "Graph APSP: APSP table ({0}) is missing!".format(apsp_table)) - _assert(not table_is_empty(apsp_table), - "Graph APSP: APSP table ({0}) is empty!".format(apsp_table)) + _assert(apsp_table and apsp_table.strip().lower() not in ('null', ''), + "Graph APSP: Invalid APSP table name!") + _assert(table_exists(apsp_table), + "Graph APSP: APSP table ({0}) is missing!".format(apsp_table)) + _assert(not table_is_empty(apsp_table), + "Graph APSP: APSP table ({0}) is empty!".format(apsp_table)) - summary = apsp_table+"_summary" - _assert(table_exists(summary), - "Graph APSP: APSP summary table ({0}) is missing!".format(summary)) - _assert(not table_is_empty(summary), - "Graph APSP: APSP summary table ({0}) is empty!".format(summary)) + summary = apsp_table + "_summary" + _assert(table_exists(summary), + "Graph APSP: APSP summary table ({0}) is missing!".format(summary)) + _assert(not table_is_empty(summary), + "Graph APSP: APSP summary table ({0}) is empty!".format(summary)) - _assert(not table_exists(path_table), - "Graph APSP: Output path table already exists!") + _assert(not table_exists(path_table), + "Graph APSP: Output path table already exists!") + + return None - return None def graph_apsp_help(schema_madlib, message, **kwargs): - """ - Help function for graph_apsp and graph_apsp_get_path - - Args: - @param schema_madlib - @param message: string, Help message string - @param kwargs - - Returns: - String. Help/usage information - """ - if not message: - help_string = """ + """ + Help function for graph_apsp and graph_apsp_get_path + + Args: + @param schema_madlib + @param message: string, Help message string + @param kwargs + + Returns: + String. Help/usage information + """ + if not message: + help_string = """ ----------------------------------------------------------------------- SUMMARY ----------------------------------------------------------------------- @@ -717,8 +717,8 @@ edges is minimized. For more details on function usage: SELECT {schema_madlib}.graph_apsp('usage') """ - elif message.lower() in ['usage', 'help', '?']: - help_string = """ + elif message.lower() in ['usage', 'help', '?']: + help_string = """ Given a graph, all pairs shortest path (apsp) algorithm finds a path for every vertex pair such that the sum of the weights of its constituent edges is minimized. @@ -728,10 +728,10 @@ edges is minimized. To retrieve the path for a specific vertex pair: SELECT {schema_madlib}.graph_apsp_get_path( - apsp_table TEXT, -- Name of the table that contains the apsp output. + apsp_table TEXT, -- Name of the table that contains the apsp output. source_vertex INT, -- The vertex that will be the source of the -- desired path. - dest_vertex INT, -- The vertex that will be the destination of the + dest_vertex INT, -- The vertex that will be the destination of the -- desired path. path_table TEXT -- Name of the output table that contains the path. ); @@ -762,8 +762,8 @@ every group and has the following columns: - path (ARRAY) : The shortest path from the source vertex to the destination vertex. """ - elif message.lower() in ("example", "examples"): - help_string = """ + elif message.lower() in ("example", "examples"): + help_string = """ ---------------------------------------------------------------------------- EXAMPLES ---------------------------------------------------------------------------- @@ -806,11 +806,11 @@ INSERT INTO edge VALUES -- Compute the apsp: DROP TABLE IF EXISTS out; SELECT madlib.graph_apsp( - 'vertex', -- Vertex table - 'id', -- Vertix id column - 'edge', -- Edge table - 'src=src, dest=dest, weight=weight', -- Comma delimited string of edge arguments - 'out' -- Output table of apsp + 'vertex', -- Vertex table + 'id', -- Vertix id column + 'edge', -- Edge table + 'src=src, dest=dest, weight=weight', -- Comma delimited string of edge arguments + 'out' -- Output table of apsp ); -- View the apsp costs for every vertex: SELECT * FROM out ORDER BY src, dest; @@ -834,11 +834,11 @@ INSERT INTO edge_gr VALUES DROP TABLE IF EXISTS out_gr, out_gr_summary; SELECT graph_apsp('vertex',NULL,'edge_gr',NULL,'out_gr','grp'); """ - else: - help_string = "No such option. Use {schema_madlib}.graph_apsp()" + else: + help_string = "No such option. Use {schema_madlib}.graph_apsp()" - return help_string.format(schema_madlib=schema_madlib, - graph_usage=get_graph_usage(schema_madlib, 'graph_apsp', - """out_table TEXT, -- Name of the table to store the result of apsp. + return help_string.format(schema_madlib=schema_madlib, + graph_usage=get_graph_usage(schema_madlib, 'graph_apsp', + """out_table TEXT, -- Name of the table to store the result of apsp. grouping_cols TEXT -- The list of grouping columns.""")) # --------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/graph_utils.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/graph_utils.py_in b/src/ports/postgres/modules/graph/graph_utils.py_in index 944c301..9d31345 100644 --- a/src/ports/postgres/modules/graph/graph_utils.py_in +++ b/src/ports/postgres/modules/graph/graph_utils.py_in @@ -27,115 +27,115 @@ @namespace graph """ -import plpy -from utilities.control import MinWarning from utilities.utilities import _assert -from utilities.utilities import extract_keyvalue_params -from utilities.utilities import unique_string from utilities.validate_args import get_cols from utilities.validate_args import unquote_ident from utilities.validate_args import table_exists from utilities.validate_args import columns_exist_in_table from utilities.validate_args import table_is_empty + def _grp_null_checks(grp_list): """ - Helper function for generating NULL checks for grouping columns + Helper function for generating NULL checks for grouping columns to be used within a WHERE clause Args: @param grp_list The list of grouping columns """ return ' AND '.join([" {i} IS NOT NULL ".format(**locals()) - for i in grp_list]) + for i in grp_list]) + def _check_groups(tbl1, tbl2, grp_list): + """ + Helper function for joining tables with groups. + Args: + @param tbl1 Name of the first table + @param tbl2 Name of the second table + @param grp_list The list of grouping columns + """ - """ - Helper function for joining tables with groups. - Args: - @param tbl1 Name of the first table - @param tbl2 Name of the second table - @param grp_list The list of grouping columns - """ + return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals()) + for i in grp_list]) - return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals()) - for i in grp_list]) def _grp_from_table(tbl, grp_list): + """ + Helper function for selecting grouping columns of a table + Args: + @param tbl Name of the table + @param grp_list The list of grouping columns + """ + return ' , '.join([" {tbl}.{i} ".format(**locals()) + for i in grp_list]) - """ - Helper function for selecting grouping columns of a table - Args: - @param tbl Name of the table - @param grp_list The list of grouping columns - """ - return ' , '.join([" {tbl}.{i} ".format(**locals()) - for i in grp_list]) def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, - out_table, func_name, **kwargs): - """ - Validates graph tables (vertex and edge) as well as the output table. - """ - _assert(out_table and out_table.strip().lower() not in ('null', ''), - "Graph {func_name}: Invalid output table name!".format(**locals())) - _assert(not table_exists(out_table), - "Graph {func_name}: Output table already exists!".format(**locals())) - - _assert(vertex_table and vertex_table.strip().lower() not in ('null', ''), - "Graph {func_name}: Invalid vertex table name!".format(**locals())) - _assert(table_exists(vertex_table), - "Graph {func_name}: Vertex table ({vertex_table}) is missing!".format( - **locals())) - _assert(not table_is_empty(vertex_table), - "Graph {func_name}: Vertex table ({vertex_table}) is empty!".format( - **locals())) - - _assert(edge_table and edge_table.strip().lower() not in ('null', ''), - "Graph {func_name}: Invalid edge table name!".format(**locals())) - _assert(table_exists(edge_table), - "Graph {func_name}: Edge table ({edge_table}) is missing!".format( - **locals())) - _assert(not table_is_empty(edge_table), - "Graph {func_name}: Edge table ({edge_table}) is empty!".format( - **locals())) - - existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table)) - _assert(vertex_id in existing_cols, - """Graph {func_name}: The vertex column {vertex_id} is not present in vertex table ({vertex_table}) """. - format(**locals())) - _assert(columns_exist_in_table(edge_table, edge_params.values()), - """Graph {func_name}: Not all columns from {cols} are present in edge table ({edge_table})""". - format(cols=edge_params.values(), **locals())) - - return None + out_table, func_name, **kwargs): + """ + Validates graph tables (vertex and edge) as well as the output table. + """ + _assert(out_table and out_table.strip().lower() not in ('null', ''), + "Graph {func_name}: Invalid output table name!".format(**locals())) + _assert(not table_exists(out_table), + "Graph {func_name}: Output table already exists!".format(**locals())) + + _assert(vertex_table and vertex_table.strip().lower() not in ('null', ''), + "Graph {func_name}: Invalid vertex table name!".format(**locals())) + _assert(table_exists(vertex_table), + "Graph {func_name}: Vertex table ({vertex_table}) is missing!".format( + **locals())) + _assert(not table_is_empty(vertex_table), + "Graph {func_name}: Vertex table ({vertex_table}) is empty!".format( + **locals())) + + _assert(edge_table and edge_table.strip().lower() not in ('null', ''), + "Graph {func_name}: Invalid edge table name!".format(**locals())) + _assert(table_exists(edge_table), + "Graph {func_name}: Edge table ({edge_table}) is missing!".format( + **locals())) + _assert(not table_is_empty(edge_table), + "Graph {func_name}: Edge table ({edge_table}) is empty!".format( + **locals())) + + existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table)) + _assert(vertex_id in existing_cols, + """Graph {func_name}: The vertex column {vertex_id} is not present in vertex table ({vertex_table}) """. + format(**locals())) + _assert(columns_exist_in_table(edge_table, edge_params.values()), + """Graph {func_name}: Not all columns from {cols} are present in edge table ({edge_table})""". + format(cols=edge_params.values(), **locals())) + + return None + def get_graph_usage(schema_madlib, func_name, other_text): - usage = """ ----------------------------------------------------------------------------- - USAGE ----------------------------------------------------------------------------- - SELECT {schema_madlib}.{func_name}( - vertex_table TEXT, -- Name of the table that contains the vertex data. - vertex_id TEXT, -- Name of the column containing the vertex ids. - edge_table TEXT, -- Name of the table that contains the edge data. - edge_args TEXT{comma} -- A comma-delimited string containing multiple - -- named arguments of the form "name=value". - {other_text} -); - -The following parameters are supported for edge table arguments ('edge_args' - above): - -src (default = 'src') : Name of the column containing the source - vertex ids in the edge table. -dest (default = 'dest') : Name of the column containing the destination - vertex ids in the edge table. -weight (default = 'weight') : Name of the column containing the weight of - edges in the edge table. -""".format(schema_madlib=schema_madlib, func_name=func_name, - other_text=other_text, comma = ',' if other_text is not None else ' ') - - return usage + usage = """ + ---------------------------------------------------------------------------- + USAGE + ---------------------------------------------------------------------------- + SELECT {schema_madlib}.{func_name}( + vertex_table TEXT, -- Name of the table that contains the vertex data. + vertex_id TEXT, -- Name of the column containing the vertex ids. + edge_table TEXT, -- Name of the table that contains the edge data. + edge_args TEXT{comma} -- A comma-delimited string containing multiple + -- named arguments of the form "name=value". + {other_text} + ); + + The following parameters are supported for edge table arguments + ('edge_args' above): + + src (default = 'src'): Name of the column containing the source + vertex ids in the edge table. + dest (default = 'dest'): Name of the column containing the destination + vertex ids in the edge table. + weight (default = 'weight'): Name of the column containing the weight of + edges in the edge table. + """.format(schema_madlib=schema_madlib, + func_name=func_name, + other_text=other_text, + comma=',' if other_text is not None else ' ') + return usage