Hi all, I created a running example of my data set and I describe what I want to achieve. The idea is to create a view over the resulting table and use it for later joins. Instead if applying a UDF to a column using a dict with 20+ (growing) million records.
Example data set: spark.createDataFrame( [ ("inquiry1", "quotation1"), ("inquiry2", "quotation2"), ("quotation2", "order2"), ("order2", "invoice2"), ("order3", "invoice3") ], ['parent', 'child'] ).createOrReplaceTempView("hierarchy") We see several hierarchies in the df above but we don’t have records indicating that e.g. inquiry1 is the root of one of the hierarchies. So we have: 1: inquiry1 > quotation1 2: inquiry2 > quotation2 > order2 3: order3 > invoice3 What I need is the following. For every child I need the level 0 parent like this: child, lvl-0-parent quotation1, inquiry1 quotation2, inquiry2 order2, inquiry2 invoice2, inquiry2 invoice3, order3 It would be perfect to see that some of the entries actually are the root by indicating: child, lvl-0-parent inquiry1, null inquiry2, null order3, null Actually that’s what I realized with my recursive UDF I put into the initial post. Thank you for any hints on that issue! Any hints on the UDF solution are also very welcome: Thx and best, Meikel From: Bode, Meikel, NMA-CFD Sent: Freitag, 30. April 2021 12:16 To: user @spark <user@spark.apache.org> Subject: Recursive Queries or Recursive UDF? Hi all, I implemented a recursive UDF, that tries to find a document number in a long list of predecessor documents. This can be a multi-level hierarchy: C is successor of B is successor of A (but many more levels are possible) As input to that UDF I prepare a dict that contains the complete document flow reduced to the required fields to follow the path back to the originating document. The dict is broadcasted and then used by the UDF. Actually this approach is very slow and now – as data growth – it kills my executors regularly so that RDDs get lost and task fail. Sometimes also the workers (docker containers) become unresponsive and are getting killed. Here is the coding of the methods: 1.: Prepare and define the UDF, broadcast dict. # Define function for recursive lookup of root document def __gen_caseid_udf_sales_document_flow(self): global bc_document_flow, udf_sales_document_flow # Prepare docflow for broadcasting by only selecting required fields df_vbfa_subset = self.spark.table("FLOWTABLE").select("clnt", "predecessor_head", "predecessor_item", "doc_num", "doc_item") # Prepare dictionary for broadcast document_flow_dic = {} for clnt, predecessor_head, predecessor_item, doc_num, doc_item in df_subset.rdd.collect(): document_flow_dic[(clnt, doc_num, doc_item)] = predecessor_head, predecessor_item # Broadcast dictionary to workers bc_document_flow = self.spark.sparkContext.broadcast(document_flow_dic) # Register new user defined function UDF udf_vbfa_sales_document_flow = func.udf(gen_caseid_udf_sale_root_lookup) 2.: The recursive function used in the UDF # Find root document def gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr, posnr): global bc_document_flow if not clnt or not docnr or not posnr: return None, None key = clnt, docnr, posnr if key in lt: docnr_tmp, item_tmp = lt[key] if docnr_tmp == docnr and item_tmp == posnr: return docnr, posnr else: return gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr_tmp, item_tmp) else: return docnr, posnr 3: The UDF # Define udf function to look up root document def gen_caseid_udf_sale_root_lookup(clnt, doc_num, posnr): global bc_document_flow # Name of the broad cast variable lt = bc_document_flow.value h, p = gen_caseid_udf_vbfa_sale_get_root_doc(lt, clnt, doc_num, posnr) return str(clnt) + str(h) + str(p) ## 4. Usage of the UDF on a DF that might contain several ten thousands of rows: # Lookup root document from document flow documents = documents.withColumn("root_doc", udf_sales_document_flow(func.col('clnt'), func.col('document_number’), func.col('item_number'))) Do you have any hint on my coding or are there any ideas how to implement a recursive select without implement a potential unoptimizable UDF? I came along https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL which might an option, does Spark support this kind of construct? Thanks and all the best, Meikel