Re: Use Spark Aggregator in PySpark
Hi, For an aggregating UDF, use spark.udf.registerJavaUDAF(name, className). Enrico Am 23.04.23 um 23:42 schrieb Thomas Wang: Hi Spark Community, I have implemented a custom Spark Aggregator (a subclass to |org.apache.spark.sql.expressions.Aggregator|). Now I'm trying to use it in a PySpark application, but for some reason, I'm not able to trigger the function. Here is what I'm doing, could someone help me take a look? Thanks. spark = self._gen_spark_session() spark.udf.registerJavaFunction( name="MyAggrator", javaClassName="my.package.MyAggrator", returnType=ArrayType(elementType=LongType()), ) The above code runs successfully. However, to call it, I assume I should do something like the following. df = df.groupBy().agg( functions.expr("MyAggrator(input)").alias("output"), ) But this one gives me the following error: pyspark.sql.utils.AnalysisException: UDF class my.package.MyAggrator doesn't implement any UDF interface My question is how can I use the Spark Aggregator defined in a jar file in PySpark? Thanks. Thomas
Reg: create spark using virtual machine through chef
Hi team, Myself akhil, Iam trying to create a spark using virtual machine through chef. Could you please help us how we can do it. If possible could you please share the documentation. Regards Akhil
Unsubcribing
Hello, does this mailist list have an administrator, please? I'm trying to unsubscribe, but to no avail. Many thanks. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
What is the best way to organize a join within a foreach?
Marco Costantini 5:55 PM (5 minutes ago) to user I have two tables: {users, orders}. In this example, let's say that for each 1 User in the users table, there are 10 Orders in the orders table. I have to use pyspark to generate a statement of Orders for each User. So, a single user will need his/her own list of Orders. Additionally, I need to send this statement to the real-world user via email (for example). My first intuition was to apply a DataFrame.foreach() on the users DataFrame. This way, I can rely on the spark workers to handle the email sending individually. However, I now do not know the best way to get each User's Orders. I will soon try the following (pseudo-code): ``` users_df = orders_df = #this is poorly named for max understandability in this context def foreach_function(row): user_id = row.user_id user_orders_df = orders_df.select(f'user_id = {user_id}') #here, I'd get any User info from 'row' #then, I'd convert all 'user_orders' to JSON #then, I'd prepare the email and send it users_df.foreach(foreach_function) ``` It is my understanding that if I do my user-specific work in the foreach function, I will capitalize on Spark's scalability when doing that work. However, I am worried of two things: If I take all Orders up front... Will that work? Will I be taking too much? Will I be taking Orders on partitions who won't handle them (different User). If I create the orders_df (filtered) within the foreach function... Will it work? Will that be too much IO to DB? The question ultimately is: How can I achieve this goal efficiently? I have not yet tried anything here. I am doing so as we speak, but am suffering from choice-paralysis. Please and thank you.
What is the best way to organize a join within a foreach?
I have two tables: {users, orders}. In this example, let's say that for each 1 User in the users table, there are 10 Orders in the orders table. I have to use pyspark to generate a statement of Orders for each User. So, a single user will need his/her own list of Orders. Additionally, I need to send this statement to the real-world user via email (for example). My first intuition was to apply a DataFrame.foreach() on the users DataFrame. This way, I can rely on the spark workers to handle the email sending individually. However, I now do not know the best way to get each User's Orders. I will soon try the following (pseudo-code): ``` users_df = orders_df = #this is poorly named for max understandability in this context def foreach_function(row): user_id = row.user_id user_orders_df = orders_df.select(f'user_id = {user_id}') #here, I'd get any User info from 'row' #then, I'd convert all 'user_orders' to JSON #then, I'd prepare the email and send it users_df.foreach(foreach_function) ``` It is my understanding that if I do my user-specific work in the foreach function, I will capitalize on Spark's scalability when doing that work. However, I am worried of two things: If I take all Orders up front... Will that work? Will I be taking too much? Will I be taking Orders on partitions who won't handle them (different User). If I create the orders_df (filtered) within the foreach function... Will it work? Will that be too much IO to DB? The question ultimately is: How can I achieve this goal efficiently? I have not yet tried anything here. I am doing so as we speak, but am suffering from choice-paralysis. Please and thank you.
unsubscribe
unsubscribe