Thanks team, Email was just an example. The point was to illustrate that some actions could be chained using Spark's foreach. In reality, this is an S3 write and a Kafka message production, which I think is quite reasonable for spark to do.
To answer Ayan's first question. Yes, all a users orders, prepared for each and every user. Other than the remarks that email transmission is unwise (which I've now reminded is irrelevant) I am not seeing an alternative to using Spark's foreach. Unless, your proposal is for the Spark job to target 1 user, and just run the job 1000's of times taking the user_id as input. That doesn't sound attractive. Also, while we say that foreach is not optimal, I cannot find any evidence of it; neither here nor online. If there are any docs about the inner workings of this functionality, please pass them to me. I continue to search for them. Even late last night! Thanks for your help team, Marco. On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Indeed very valid points by Ayan. How email is going to handle 1000s of > records. As a solution architect I tend to replace. Users by customers and > for each order there must be products sort of many to many relationship. If > I was a customer I would also be interested in product details as > well.sending via email sounds like a Jurassic park solution 😗 > > On Wed, 26 Apr 2023 at 10:24, ayan guha <guha.a...@gmail.com> wrote: > >> Adding to what Mitch said, >> >> 1. Are you trying to send statements of all orders to all users? Or the >> latest order only? >> >> 2. Sending email is not a good use of spark. instead, I suggest to use a >> notification service or function. Spark should write to a queue (kafka, >> sqs...pick your choice here). >> >> Best regards >> Ayan >> >> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Well OK in a nutshell you want the result set for every user prepared >>> and email to that user right. >>> >>> This is a form of ETL where those result sets need to be posted >>> somewhere. Say you create a table based on the result set prepared for each >>> user. You may have many raw target tables at the end of the first ETL. How >>> does this differ from using forEach? Performance wise forEach may not be >>> optimal. >>> >>> Can you take the sample tables and try your method? >>> >>> HTH >>> >>> Mich Talebzadeh, >>> Lead Solutions Architect/Engineering Lead >>> Palantir Technologies Limited >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Wed, 26 Apr 2023 at 04:10, Marco Costantini < >>> marco.costant...@rocketfncl.com> wrote: >>> >>>> Hi Mich, >>>> First, thank you for that. Great effort put into helping. >>>> >>>> Second, I don't think this tackles the technical challenge here. I >>>> understand the windowing as it serves those ranks you created, but I don't >>>> see how the ranks contribute to the solution. >>>> Third, the core of the challenge is about performing this kind of >>>> 'statement' but for all users. In this example we target Mich, but that >>>> reduces the complexity by a lot! In fact, a simple join and filter would >>>> solve that one. >>>> >>>> Any thoughts on that? For me, the foreach is desirable because I can >>>> have the workers chain other actions to each iteration (send email, send >>>> HTTP request, etc). >>>> >>>> Thanks Mich, >>>> Marco. >>>> >>>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Hi Marco, >>>>> >>>>> First thoughts. >>>>> >>>>> foreach() is an action operation that is to iterate/loop over each >>>>> element in the dataset, meaning cursor based. That is different from >>>>> operating over the dataset as a set which is far more efficient. >>>>> >>>>> So in your case as I understand it correctly, you want to get order >>>>> for each user (say Mich), convert the result set to json and send it to >>>>> Mich via email >>>>> >>>>> Let us try this based on sample data >>>>> >>>>> Put your csv files into HDFS directory >>>>> >>>>> hdfs dfs -put users.csv /data/stg/test >>>>> hdfs dfs -put orders.csv /data/stg/test >>>>> >>>>> Then create dataframes from csv files, create temp views and do a join >>>>> on result sets with some slicing and dicing on orders table >>>>> >>>>> #! /usr/bin/env python3 >>>>> from __future__ import print_function >>>>> import sys >>>>> import findspark >>>>> findspark.init() >>>>> from pyspark.sql import SparkSession >>>>> from pyspark import SparkContext >>>>> from pyspark.sql import SQLContext, HiveContext >>>>> from pyspark.sql.window import Window >>>>> >>>>> def spark_session(appName): >>>>> return SparkSession.builder \ >>>>> .appName(appName) \ >>>>> .enableHiveSupport() \ >>>>> .getOrCreate() >>>>> >>>>> def main(): >>>>> appName = "ORDERS" >>>>> spark =spark_session(appName) >>>>> # get the sample >>>>> users_file="hdfs://rhes75:9000/data/stg/test/users.csv" >>>>> orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv" >>>>> users_df = >>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema", >>>>> "true").option("header", "true").load(users_file) >>>>> users_df.printSchema() >>>>> """ >>>>> root >>>>> |-- id: integer (nullable = true) >>>>> |-- name: string (nullable = true) >>>>> """ >>>>> >>>>> print(f"""\n Reading from {users_file}\n""") >>>>> users_df.show(5,False) >>>>> orders_df = >>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema", >>>>> "true").option("header", "true").load(orders_file) >>>>> orders_df.printSchema() >>>>> """ >>>>> root >>>>> |-- id: integer (nullable = true) >>>>> |-- description: string (nullable = true) >>>>> |-- amount: double (nullable = true) >>>>> |-- user_id: integer (nullable = true) >>>>> """ >>>>> print(f"""\n Reading from {orders_file}\n""") >>>>> orders_df.show(50,False) >>>>> users_df.createOrReplaceTempView("users") >>>>> orders_df.createOrReplaceTempView("orders") >>>>> # Create a list of orders for each user >>>>> print(f"""\n Doing a join on two temp views\n""") >>>>> >>>>> sqltext = """ >>>>> SELECT u.name, t.order_id, t.description, t.amount, t.maxorders >>>>> FROM >>>>> ( >>>>> SELECT >>>>> user_id AS user_id >>>>> , id as order_id >>>>> , description as description >>>>> , amount AS amount >>>>> , DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS >>>>> RANK >>>>> , MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS >>>>> maxorders >>>>> FROM orders >>>>> ) t >>>>> INNER JOIN users u ON t.user_id = u.id >>>>> AND u.name = 'Mich' >>>>> ORDER BY t.order_id >>>>> """ >>>>> spark.sql(sqltext).show(50) >>>>> if __name__ == '__main__': >>>>> main() >>>>> >>>>> Final outcome displaying orders for user Mich >>>>> >>>>> Doing a join on two temp views >>>>> >>>>> Doing a join on two temp views >>>>> >>>>> +----+--------+-----------------+------+---------+ >>>>> |name|order_id| description|amount|maxorders| >>>>> +----+--------+-----------------+------+---------+ >>>>> |Mich| 50001| Mich's 1st order|101.11| 101.11| >>>>> |Mich| 50002| Mich's 2nd order|102.11| 102.11| >>>>> |Mich| 50003| Mich's 3rd order|103.11| 103.11| >>>>> |Mich| 50004| Mich's 4th order|104.11| 104.11| >>>>> |Mich| 50005| Mich's 5th order|105.11| 105.11| >>>>> |Mich| 50006| Mich's 6th order|106.11| 106.11| >>>>> |Mich| 50007| Mich's 7th order|107.11| 107.11| >>>>> |Mich| 50008| Mich's 8th order|108.11| 108.11| >>>>> |Mich| 50009| Mich's 9th order|109.11| 109.11| >>>>> |Mich| 50010|Mich's 10th order|210.11| 210.11| >>>>> +----+--------+-----------------+------+---------+ >>>>> >>>>> You can start on this. Happy coding >>>>> >>>>> Mich Talebzadeh, >>>>> Lead Solutions Architect/Engineering Lead >>>>> Palantir Technologies Limited >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, 25 Apr 2023 at 18:50, Marco Costantini < >>>>> marco.costant...@rocketfncl.com> wrote: >>>>> >>>>>> Thanks Mich, >>>>>> >>>>>> Great idea. I have done it. Those files are attached. I'm interested >>>>>> to know your thoughts. Let's imagine this same structure, but with huge >>>>>> amounts of data as well. >>>>>> >>>>>> Please and thank you, >>>>>> Marco. >>>>>> >>>>>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Hi Marco, >>>>>>> >>>>>>> Let us start simple, >>>>>>> >>>>>>> Provide a csv file of 5 rows for the users table. Each row has a >>>>>>> unique user_id and one or two other columns like fictitious email etc. >>>>>>> >>>>>>> Also for each user_id, provide 10 rows of orders table, meaning that >>>>>>> orders table has 5 x 10 rows for each user_id. >>>>>>> >>>>>>> both as comma separated csv file >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> Mich Talebzadeh, >>>>>>> Lead Solutions Architect/Engineering Lead >>>>>>> Palantir Technologies Limited >>>>>>> London >>>>>>> United Kingdom >>>>>>> >>>>>>> >>>>>>> view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini < >>>>>>> marco.costant...@rocketfncl.com> wrote: >>>>>>> >>>>>>>> Thanks Mich, >>>>>>>> I have not but I will certainly read up on this today. >>>>>>>> >>>>>>>> To your point that all of the essential data is in the 'orders' >>>>>>>> table; I agree! That distills the problem nicely. Yet, I still have >>>>>>>> some >>>>>>>> questions on which someone may be able to shed some light. >>>>>>>> >>>>>>>> 1) If my 'orders' table is very large, and will need to be >>>>>>>> aggregated by 'user_id', how will Spark intelligently optimize on that >>>>>>>> constraint (only read data for relevent 'user_id's). Is that something >>>>>>>> I >>>>>>>> have to instruct Spark to do? >>>>>>>> >>>>>>>> 2) Without #1, even with windowing, am I asking each partition to >>>>>>>> search too much? >>>>>>>> >>>>>>>> Please, if you have any links to documentation I can read on *how* >>>>>>>> Spark works under the hood for these operations, I would appreciate it >>>>>>>> if >>>>>>>> you give them. Spark has become a pillar on my team and knowing it in >>>>>>>> more >>>>>>>> detail is warranted. >>>>>>>> >>>>>>>> Slightly pivoting the subject here; I have tried something. It was >>>>>>>> a suggestion by an AI chat bot and it seemed reasonable. In my main >>>>>>>> Spark >>>>>>>> script I now have the line: >>>>>>>> >>>>>>>> ``` >>>>>>>> grouped_orders_df = >>>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id', >>>>>>>> 'timestamp', 'total', 'description'))).alias('orders')) >>>>>>>> ``` >>>>>>>> (json is ultimately needed) >>>>>>>> >>>>>>>> This actually achieves my goal by putting all of the 'orders' in a >>>>>>>> single Array column. Now my worry is, will this column become too >>>>>>>> large if >>>>>>>> there are a great many orders. Is there a limit? I have search for >>>>>>>> documentation on such a limit but could not find any. >>>>>>>> >>>>>>>> I truly appreciate your help Mich and team, >>>>>>>> Marco. >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh < >>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Have you thought of using windowing function >>>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to >>>>>>>>> achieve this? >>>>>>>>> >>>>>>>>> Effectively all your information is in the orders table. >>>>>>>>> >>>>>>>>> HTH >>>>>>>>> >>>>>>>>> Mich Talebzadeh, >>>>>>>>> Lead Solutions Architect/Engineering Lead >>>>>>>>> Palantir Technologies Limited >>>>>>>>> London >>>>>>>>> United Kingdom >>>>>>>>> >>>>>>>>> >>>>>>>>> view my Linkedin profile >>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>>>> >>>>>>>>> >>>>>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>>>> for any loss, damage or destruction of data or any other property >>>>>>>>> which may >>>>>>>>> arise from relying on this email's technical content is explicitly >>>>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>>>> damages >>>>>>>>> arising from such loss, damage or destruction. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini < >>>>>>>>> marco.costant...@rocketfncl.com> wrote: >>>>>>>>> >>>>>>>>>> I have two tables: {users, orders}. In this example, let's say >>>>>>>>>> that for each 1 User in the users table, there are 100000 Orders in >>>>>>>>>> the >>>>>>>>>> orders table. >>>>>>>>>> >>>>>>>>>> I have to use pyspark to generate a statement of Orders for each >>>>>>>>>> User. So, a single user will need his/her own list of Orders. >>>>>>>>>> Additionally, >>>>>>>>>> I need to send this statement to the real-world user via email (for >>>>>>>>>> example). >>>>>>>>>> >>>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the >>>>>>>>>> users DataFrame. This way, I can rely on the spark workers to handle >>>>>>>>>> the >>>>>>>>>> email sending individually. However, I now do not know the best way >>>>>>>>>> to get >>>>>>>>>> each User's Orders. >>>>>>>>>> >>>>>>>>>> I will soon try the following (pseudo-code): >>>>>>>>>> >>>>>>>>>> ``` >>>>>>>>>> users_df = <my entire users DataFrame> >>>>>>>>>> orders_df = <my entire orders DataFrame> >>>>>>>>>> >>>>>>>>>> #this is poorly named for max understandability in this context >>>>>>>>>> def foreach_function(row): >>>>>>>>>> user_id = row.user_id >>>>>>>>>> user_orders_df = orders_df.select(f'user_id = {user_id}') >>>>>>>>>> >>>>>>>>>> #here, I'd get any User info from 'row' >>>>>>>>>> #then, I'd convert all 'user_orders' to JSON >>>>>>>>>> #then, I'd prepare the email and send it >>>>>>>>>> >>>>>>>>>> users_df.foreach(foreach_function) >>>>>>>>>> ``` >>>>>>>>>> >>>>>>>>>> It is my understanding that if I do my user-specific work in the >>>>>>>>>> foreach function, I will capitalize on Spark's scalability when >>>>>>>>>> doing that >>>>>>>>>> work. However, I am worried of two things: >>>>>>>>>> >>>>>>>>>> If I take all Orders up front... >>>>>>>>>> >>>>>>>>>> Will that work? >>>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions >>>>>>>>>> who won't handle them (different User). >>>>>>>>>> >>>>>>>>>> If I create the orders_df (filtered) within the foreach >>>>>>>>>> function... >>>>>>>>>> >>>>>>>>>> Will it work? >>>>>>>>>> Will that be too much IO to DB? >>>>>>>>>> >>>>>>>>>> The question ultimately is: How can I achieve this goal >>>>>>>>>> efficiently? >>>>>>>>>> >>>>>>>>>> I have not yet tried anything here. I am doing so as we speak, >>>>>>>>>> but am suffering from choice-paralysis. >>>>>>>>>> >>>>>>>>>> Please and thank you. >>>>>>>>>> >>>>>>>>> -- >> Best Regards, >> Ayan Guha >> > -- > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >