Thanks team,
Email was just an example. The point was to illustrate that some actions
could be chained using Spark's foreach. In reality, this is an S3 write and
a Kafka message production, which I think is quite reasonable for spark to

To answer Ayan's first question. Yes, all a users orders, prepared for each
and every user.

Other than the remarks that email transmission is unwise (which I've now
reminded is irrelevant) I am not seeing an alternative to using Spark's
foreach. Unless, your proposal is for the Spark job to target 1 user, and
just run the job 1000's of times taking the user_id as input. That doesn't
sound attractive.

Also, while we say that foreach is not optimal, I cannot find any evidence
of it; neither here nor online. If there are any docs about the inner
workings of this functionality, please pass them to me. I continue to
search for them. Even late last night!

Thanks for your help team,

On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh

> Indeed very valid points by Ayan. How email is going to handle 1000s of
> records. As a solution architect I tend to replace. Users by customers and
> for each order there must be products sort of many to many relationship. If
> I was a customer I would also be interested in product details as
> well.sending via email sounds like a Jurassic park solution 😗
On Wed, 26 Apr 2023 at 10:24, ayan guha wrote:
>> Adding to what Mitch said,
>> 1. Are you trying to send statements of all orders to all users? Or the
>> latest order only?
>> 2. Sending email is not a good use of spark. instead, I suggest to use a
>> notification service or function. Spark should write to a queue (kafka,
>> sqs...pick your choice here).
>> Best regards
>> Ayan
On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh wrote:
>>> wrote:
>>> Well OK in a nutshell you want the result set for every user prepared
>>> and email to that user right.
>>> This is a form of ETL where those result sets need to be posted
>>> somewhere. Say you create a table based on the result set prepared for each
>>> user. You may have many raw target tables at the end of the first ETL. How
>>> does this differ from using forEach? Performance wise forEach may not be
>>> optimal.
>>> Can you take the sample tables and try your method?
>>> HTH
On Wed, 26 Apr 2023 at 04:10, Marco Costantini wrote:
>>>> wrote:
>>>> Hi Mich,
>>>> First, thank you for that. Great effort put into helping.
>>>> Second, I don't think this tackles the technical challenge here. I
>>>> understand the windowing as it serves those ranks you created, but I don't
>>>> see how the ranks contribute to the solution.
>>>> Third, the core of the challenge is about performing this kind of
>>>> 'statement' but for all users. In this example we target Mich, but that
>>>> reduces the complexity by a lot! In fact, a simple join and filter would
>>>> solve that one.
>>>> Any thoughts on that? For me, the foreach is desirable because I can
>>>> have the workers chain other actions to each iteration (send email, send
>>>> HTTP request, etc).
>>>> Thanks Mich,
>>>> Marco.
On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh wrote:
>>>>> wrote:
>>>>> Hi Marco,
>>>>> First thoughts.
>>>>> foreach() is an action operation that is to iterate/loop over each
>>>>> element in the dataset, meaning cursor based. That is different from
>>>>> operating over the dataset as a set which is far more efficient.
>>>>> So in your case as I understand it correctly, you want to get order
>>>>> for each user (say Mich), convert the result set to json and send it to
>>>>> Mich via email
>>>>> Let us try this based on sample data
>>>>> Put your csv files into HDFS directory
>>>>> hdfs dfs -put users.csv /data/stg/test
>>>>> hdfs dfs -put orders.csv /data/stg/test
>>>>> Then create dataframes from csv files, create temp views and do a join
>>>>> on result sets with some slicing and dicing on orders table
>>>>> #! /usr/bin/env python3
>>>>> from __future__ import print_function
>>>>> import sys
>>>>> import findspark
>>>>> findspark.init()
>>>>> from pyspark.sql import SparkSession
>>>>> from pyspark import SparkContext
>>>>> from pyspark.sql import SQLContext, HiveContext
>>>>> from pyspark.sql.window import Window
>>>>> def spark_session(appName):
>>>>>   return SparkSession.builder \
>>>>>         .appName(appName) \
>>>>>         .enableHiveSupport() \
>>>>>         .getOrCreate()
>>>>> def main():
>>>>>     appName = "ORDERS"
>>>>>     spark =spark_session(appName)
>>>>>     # get the sample
>>>>>     users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>>>>>     orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>>>>>     users_df =
>>>>> "true").option("header", "true").load(users_file)
>>>>>     users_df.printSchema()
>>>>>     """
>>>>>     root
>>>>>     |-- id: integer (nullable = true)
>>>>>     |-- name: string (nullable = true)
>>>>>     """
>>>>>     print(f"""\n Reading from  {users_file}\n""")
>>>>>     orders_df =
>>>>> "true").option("header", "true").load(orders_file)
>>>>>     orders_df.printSchema()
>>>>>     """
>>>>>     root
>>>>>     |-- id: integer (nullable = true)
>>>>>     |-- description: string (nullable = true)
>>>>>     |-- amount: double (nullable = true)
>>>>>     |-- user_id: integer (nullable = true)
>>>>>      """
>>>>>     print(f"""\n Reading from  {orders_file}\n""")
>>>>>     users_df.createOrReplaceTempView("users")
>>>>>     orders_df.createOrReplaceTempView("orders")
>>>>>     # Create a list of orders for each user
>>>>>     print(f"""\n Doing a join on two temp views\n""")
>>>>>     sqltext = """
>>>>>     SELECT, t.order_id, t.description, t.amount, t.maxorders
>>>>>     FROM
>>>>>     (
>>>>>     SELECT
>>>>>             user_id AS user_id
>>>>>         ,   id as order_id
>>>>>         ,   description as description
>>>>>         ,   amount AS amount
>>>>>         ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS
>>>>> RANK
>>>>>         ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS
>>>>> maxorders
>>>>>     FROM orders
>>>>>     ) t
>>>>>     INNER JOIN users u ON t.user_id =
>>>>>     AND = 'Mich'
>>>>>     ORDER BY t.order_id
>>>>>     """
>>>>>     spark.sql(sqltext).show(50)
>>>>> if __name__ == '__main__':
>>>>>     main()
>>>>> Final outcome displaying orders for user Mich
>>>>> Doing a join on two temp views
>>>>>  Doing a join on two temp views
>>>>> +----+--------+-----------------+------+---------+
>>>>> |name|order_id|      description|amount|maxorders|
>>>>> +----+--------+-----------------+------+---------+
>>>>> |Mich|   50001| Mich's 1st order|101.11|   101.11|
>>>>> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
>>>>> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
>>>>> |Mich|   50004| Mich's 4th order|104.11|   104.11|
>>>>> |Mich|   50005| Mich's 5th order|105.11|   105.11|
>>>>> |Mich|   50006| Mich's 6th order|106.11|   106.11|
>>>>> |Mich|   50007| Mich's 7th order|107.11|   107.11|
>>>>> |Mich|   50008| Mich's 8th order|108.11|   108.11|
>>>>> |Mich|   50009| Mich's 9th order|109.11|   109.11|
>>>>> |Mich|   50010|Mich's 10th order|210.11|   210.11|
>>>>> +----+--------+-----------------+------+---------+
>>>>> You can start on this.  Happy coding
On Tue, 25 Apr 2023 at 18:50, Marco Costantini wrote:
>>>>>> wrote:
>>>>>> Thanks Mich,
>>>>>> Great idea. I have done it. Those files are attached. I'm interested
>>>>>> to know your thoughts. Let's imagine this same structure, but with huge
>>>>>> amounts of data as well.
>>>>>> Please and thank you,
>>>>>> Marco.
On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh wrote:
>>>>>>> wrote:
>>>>>>> Hi Marco,
>>>>>>> Let us start simple,
>>>>>>> Provide a csv file of 5 rows for the users table. Each row has a
>>>>>>> unique user_id and one or two other columns like fictitious email etc.
>>>>>>> Also for each user_id, provide 10 rows of orders table, meaning that
>>>>>>> orders table has 5 x 10 rows for each user_id.
>>>>>>> both as comma separated csv file
>>>>>>> HTH
On Tue, 25 Apr 2023 at 14:07, Marco Costantini wrote:
>>>>>>>> wrote:
>>>>>>>> Thanks Mich,
>>>>>>>> I have not but I will certainly read up on this today.
>>>>>>>> To your point that all of the essential data is in the 'orders'
>>>>>>>> table; I agree! That distills the problem nicely. Yet, I still have 
>>>>>>>> some
>>>>>>>> questions on which someone may be able to shed some light.
>>>>>>>> 1) If my 'orders' table is very large, and will need to be
>>>>>>>> aggregated by 'user_id', how will Spark intelligently optimize on that
>>>>>>>> constraint (only read data for relevent 'user_id's). Is that something 
>>>>>>>> I
>>>>>>>> have to instruct Spark to do?
>>>>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>>>>> search too much?
>>>>>>>> Please, if you have any links to documentation I can read on *how*
>>>>>>>> Spark works under the hood for these operations, I would appreciate it 
>>>>>>>> if
>>>>>>>> you give them. Spark has become a pillar on my team and knowing it in 
>>>>>>>> more
>>>>>>>> detail is warranted.
>>>>>>>> Slightly pivoting the subject here; I have tried something. It was
>>>>>>>> a suggestion by an AI chat bot and it seemed reasonable. In my main 
>>>>>>>> Spark
>>>>>>>> script I now have the line:
>>>>>>>> ```
>>>>>>>> grouped_orders_df =
>>>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>>>>> ```
>>>>>>>> (json is ultimately needed)
>>>>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>>>>> single Array column. Now my worry is, will this column become too 
>>>>>>>> large if
>>>>>>>> there are a great many orders. Is there a limit? I have search for
>>>>>>>> documentation on such a limit but could not find any.
>>>>>>>> I truly appreciate your help Mich and team,
>>>>>>>> Marco.
On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh wrote:
>>>>>>>>> wrote:
>>>>>>>>> Have you thought of using  windowing function
>>>>>>>>> <>s to
>>>>>>>>> achieve this?
>>>>>>>>> Effectively all your information is in the orders table.
>>>>>>>>> HTH
On Tue, 25 Apr 2023 at 00:15, Marco Costantini wrote:
>>>>>>>>>> wrote:
>>>>>>>>>> I have two tables: {users, orders}. In this example, let's say
>>>>>>>>>> that for each 1 User in the users table, there are 100000 Orders in 
>>>>>>>>>> the
>>>>>>>>>> orders table.
>>>>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>>>>> User. So, a single user will need his/her own list of Orders. 
>>>>>>>>>> Additionally,
>>>>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>>>>> example).
>>>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the
>>>>>>>>>> users DataFrame. This way, I can rely on the spark workers to handle 
>>>>>>>>>> the
>>>>>>>>>> email sending individually. However, I now do not know the best way 
>>>>>>>>>> to get
>>>>>>>>>> each User's Orders.
>>>>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>>>> ```
>>>>>>>>>> users_df = <my entire users DataFrame>
>>>>>>>>>> orders_df = <my entire orders DataFrame>
>>>>>>>>>> #this is poorly named for max understandability in this context
>>>>>>>>>> def foreach_function(row):
>>>>>>>>>>   user_id = row.user_id
>>>>>>>>>>   user_orders_df ='user_id = {user_id}')
>>>>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>>>> users_df.foreach(foreach_function)
>>>>>>>>>> ```
>>>>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>>>>> foreach function, I will capitalize on Spark's scalability when 
>>>>>>>>>> doing that
>>>>>>>>>> work. However, I am worried of two things:
>>>>>>>>>> If I take all Orders up front...
>>>>>>>>>> Will that work?
>>>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions
>>>>>>>>>> who won't handle them (different User).
>>>>>>>>>> If I create the orders_df (filtered) within the foreach
>>>>>>>>>> function...
>>>>>>>>>> Will it work?
>>>>>>>>>> Will that be too much IO to DB?
>>>>>>>>>> The question ultimately is: How can I achieve this goal
>>>>>>>>>> efficiently?
>>>>>>>>>> I have not yet tried anything here. I am doing so as we speak,
>>>>>>>>>> but am suffering from choice-paralysis.
>>>>>>>>>> Please and thank you.
>> Best Regards,
>> Ayan Guha
> --
