Adding to what Mitch said,

1. Are you trying to send statements of all orders to all users? Or the
latest order only?

2. Sending email is not a good use of spark. instead, I suggest to use a
notification service or function. Spark should write to a queue (kafka,
sqs...pick your choice here).

Best regards
Ayan

On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Well OK in a nutshell you want the result set for every user prepared and
> email to that user right.
>
> This is a form of ETL where those result sets need to be posted somewhere.
> Say you create a table based on the result set prepared for each user. You
> may have many raw target tables at the end of the first ETL. How does this
> differ from using forEach? Performance wise forEach may not be optimal.
>
> Can you take the sample tables and try your method?
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
> marco.costant...@rocketfncl.com> wrote:
>
>> Hi Mich,
>> First, thank you for that. Great effort put into helping.
>>
>> Second, I don't think this tackles the technical challenge here. I
>> understand the windowing as it serves those ranks you created, but I don't
>> see how the ranks contribute to the solution.
>> Third, the core of the challenge is about performing this kind of
>> 'statement' but for all users. In this example we target Mich, but that
>> reduces the complexity by a lot! In fact, a simple join and filter would
>> solve that one.
>>
>> Any thoughts on that? For me, the foreach is desirable because I can have
>> the workers chain other actions to each iteration (send email, send HTTP
>> request, etc).
>>
>> Thanks Mich,
>> Marco.
>>
>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Marco,
>>>
>>> First thoughts.
>>>
>>> foreach() is an action operation that is to iterate/loop over each
>>> element in the dataset, meaning cursor based. That is different from
>>> operating over the dataset as a set which is far more efficient.
>>>
>>> So in your case as I understand it correctly, you want to get order for
>>> each user (say Mich), convert the result set to json and send it to Mich
>>> via email
>>>
>>> Let us try this based on sample data
>>>
>>> Put your csv files into HDFS directory
>>>
>>> hdfs dfs -put users.csv /data/stg/test
>>> hdfs dfs -put orders.csv /data/stg/test
>>>
>>> Then create dataframes from csv files, create temp views and do a join
>>> on result sets with some slicing and dicing on orders table
>>>
>>> #! /usr/bin/env python3
>>> from __future__ import print_function
>>> import sys
>>> import findspark
>>> findspark.init()
>>> from pyspark.sql import SparkSession
>>> from pyspark import SparkContext
>>> from pyspark.sql import SQLContext, HiveContext
>>> from pyspark.sql.window import Window
>>>
>>> def spark_session(appName):
>>>   return SparkSession.builder \
>>>         .appName(appName) \
>>>         .enableHiveSupport() \
>>>         .getOrCreate()
>>>
>>> def main():
>>>     appName = "ORDERS"
>>>     spark =spark_session(appName)
>>>     # get the sample
>>>     users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>>>     orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>>>     users_df =
>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>> "true").option("header", "true").load(users_file)
>>>     users_df.printSchema()
>>>     """
>>>     root
>>>     |-- id: integer (nullable = true)
>>>     |-- name: string (nullable = true)
>>>     """
>>>
>>>     print(f"""\n Reading from  {users_file}\n""")
>>>     users_df.show(5,False)
>>>     orders_df =
>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>> "true").option("header", "true").load(orders_file)
>>>     orders_df.printSchema()
>>>     """
>>>     root
>>>     |-- id: integer (nullable = true)
>>>     |-- description: string (nullable = true)
>>>     |-- amount: double (nullable = true)
>>>     |-- user_id: integer (nullable = true)
>>>      """
>>>     print(f"""\n Reading from  {orders_file}\n""")
>>>     orders_df.show(50,False)
>>>     users_df.createOrReplaceTempView("users")
>>>     orders_df.createOrReplaceTempView("orders")
>>>     # Create a list of orders for each user
>>>     print(f"""\n Doing a join on two temp views\n""")
>>>
>>>     sqltext = """
>>>     SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
>>>     FROM
>>>     (
>>>     SELECT
>>>             user_id AS user_id
>>>         ,   id as order_id
>>>         ,   description as description
>>>         ,   amount AS amount
>>>         ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS
>>> RANK
>>>         ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS
>>> maxorders
>>>     FROM orders
>>>     ) t
>>>     INNER JOIN users u ON t.user_id = u.id
>>>     AND  u.name = 'Mich'
>>>     ORDER BY t.order_id
>>>     """
>>>     spark.sql(sqltext).show(50)
>>> if __name__ == '__main__':
>>>     main()
>>>
>>> Final outcome displaying orders for user Mich
>>>
>>> Doing a join on two temp views
>>>
>>>  Doing a join on two temp views
>>>
>>> +----+--------+-----------------+------+---------+
>>> |name|order_id|      description|amount|maxorders|
>>> +----+--------+-----------------+------+---------+
>>> |Mich|   50001| Mich's 1st order|101.11|   101.11|
>>> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
>>> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
>>> |Mich|   50004| Mich's 4th order|104.11|   104.11|
>>> |Mich|   50005| Mich's 5th order|105.11|   105.11|
>>> |Mich|   50006| Mich's 6th order|106.11|   106.11|
>>> |Mich|   50007| Mich's 7th order|107.11|   107.11|
>>> |Mich|   50008| Mich's 8th order|108.11|   108.11|
>>> |Mich|   50009| Mich's 9th order|109.11|   109.11|
>>> |Mich|   50010|Mich's 10th order|210.11|   210.11|
>>> +----+--------+-----------------+------+---------+
>>>
>>> You can start on this.  Happy coding
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 25 Apr 2023 at 18:50, Marco Costantini <
>>> marco.costant...@rocketfncl.com> wrote:
>>>
>>>> Thanks Mich,
>>>>
>>>> Great idea. I have done it. Those files are attached. I'm interested to
>>>> know your thoughts. Let's imagine this same structure, but with huge
>>>> amounts of data as well.
>>>>
>>>> Please and thank you,
>>>> Marco.
>>>>
>>>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi Marco,
>>>>>
>>>>> Let us start simple,
>>>>>
>>>>> Provide a csv file of 5 rows for the users table. Each row has a
>>>>> unique user_id and one or two other columns like fictitious email etc.
>>>>>
>>>>> Also for each user_id, provide 10 rows of orders table, meaning that
>>>>> orders table has 5 x 10 rows for each user_id.
>>>>>
>>>>> both as comma separated csv file
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Architect/Engineering Lead
>>>>> Palantir Technologies Limited
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>>>> marco.costant...@rocketfncl.com> wrote:
>>>>>
>>>>>> Thanks Mich,
>>>>>> I have not but I will certainly read up on this today.
>>>>>>
>>>>>> To your point that all of the essential data is in the 'orders'
>>>>>> table; I agree! That distills the problem nicely. Yet, I still have some
>>>>>> questions on which someone may be able to shed some light.
>>>>>>
>>>>>> 1) If my 'orders' table is very large, and will need to be aggregated
>>>>>> by 'user_id', how will Spark intelligently optimize on that constraint
>>>>>> (only read data for relevent 'user_id's). Is that something I have to
>>>>>> instruct Spark to do?
>>>>>>
>>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>>> search too much?
>>>>>>
>>>>>> Please, if you have any links to documentation I can read on *how*
>>>>>> Spark works under the hood for these operations, I would appreciate it if
>>>>>> you give them. Spark has become a pillar on my team and knowing it in 
>>>>>> more
>>>>>> detail is warranted.
>>>>>>
>>>>>> Slightly pivoting the subject here; I have tried something. It was a
>>>>>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>>>>> script I now have the line:
>>>>>>
>>>>>> ```
>>>>>> grouped_orders_df =
>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>>> ```
>>>>>> (json is ultimately needed)
>>>>>>
>>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>>> single Array column. Now my worry is, will this column become too large 
>>>>>> if
>>>>>> there are a great many orders. Is there a limit? I have search for
>>>>>> documentation on such a limit but could not find any.
>>>>>>
>>>>>> I truly appreciate your help Mich and team,
>>>>>> Marco.
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Have you thought of using  windowing function
>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>>> achieve this?
>>>>>>>
>>>>>>> Effectively all your information is in the orders table.
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>> Palantir Technologies Limited
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>>>> marco.costant...@rocketfncl.com> wrote:
>>>>>>>
>>>>>>>> I have two tables: {users, orders}. In this example, let's say that
>>>>>>>> for each 1 User in the users table, there are 100000 Orders in the 
>>>>>>>> orders
>>>>>>>> table.
>>>>>>>>
>>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>>> User. So, a single user will need his/her own list of Orders. 
>>>>>>>> Additionally,
>>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>>> example).
>>>>>>>>
>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the users
>>>>>>>> DataFrame. This way, I can rely on the spark workers to handle the 
>>>>>>>> email
>>>>>>>> sending individually. However, I now do not know the best way to get 
>>>>>>>> each
>>>>>>>> User's Orders.
>>>>>>>>
>>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>>
>>>>>>>> ```
>>>>>>>> users_df = <my entire users DataFrame>
>>>>>>>> orders_df = <my entire orders DataFrame>
>>>>>>>>
>>>>>>>> #this is poorly named for max understandability in this context
>>>>>>>> def foreach_function(row):
>>>>>>>>   user_id = row.user_id
>>>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>>>
>>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>>
>>>>>>>> users_df.foreach(foreach_function)
>>>>>>>> ```
>>>>>>>>
>>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>>> foreach function, I will capitalize on Spark's scalability when doing 
>>>>>>>> that
>>>>>>>> work. However, I am worried of two things:
>>>>>>>>
>>>>>>>> If I take all Orders up front...
>>>>>>>>
>>>>>>>> Will that work?
>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions
>>>>>>>> who won't handle them (different User).
>>>>>>>>
>>>>>>>> If I create the orders_df (filtered) within the foreach function...
>>>>>>>>
>>>>>>>> Will it work?
>>>>>>>> Will that be too much IO to DB?
>>>>>>>>
>>>>>>>> The question ultimately is: How can I achieve this goal efficiently?
>>>>>>>>
>>>>>>>> I have not yet tried anything here. I am doing so as we speak, but
>>>>>>>> am suffering from choice-paralysis.
>>>>>>>>
>>>>>>>> Please and thank you.
>>>>>>>>
>>>>>>> --
Best Regards,
Ayan Guha

Reply via email to