Thanks team,
Email was just an example. The point was to illustrate that some actions
could be chained using Spark's foreach. In reality, this is an S3 write and
a Kafka message production, which I think is quite reasonable for spark to
do.

To answer Ayan's first question. Yes, all a users orders, prepared for each
and every user.

Other than the remarks that email transmission is unwise (which I've now
reminded is irrelevant) I am not seeing an alternative to using Spark's
foreach. Unless, your proposal is for the Spark job to target 1 user, and
just run the job 1000's of times taking the user_id as input. That doesn't
sound attractive.

Also, while we say that foreach is not optimal, I cannot find any evidence
of it; neither here nor online. If there are any docs about the inner
workings of this functionality, please pass them to me. I continue to
search for them. Even late last night!

Thanks for your help team,
Marco.

On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Indeed very valid points by Ayan. How email is going to handle 1000s of
> records. As a solution architect I tend to replace. Users by customers and
> for each order there must be products sort of many to many relationship. If
> I was a customer I would also be interested in product details as
> well.sending via email sounds like a Jurassic park solution 😗
>
> On Wed, 26 Apr 2023 at 10:24, ayan guha <guha.a...@gmail.com> wrote:
>
>> Adding to what Mitch said,
>>
>> 1. Are you trying to send statements of all orders to all users? Or the
>> latest order only?
>>
>> 2. Sending email is not a good use of spark. instead, I suggest to use a
>> notification service or function. Spark should write to a queue (kafka,
>> sqs...pick your choice here).
>>
>> Best regards
>> Ayan
>>
>> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Well OK in a nutshell you want the result set for every user prepared
>>> and email to that user right.
>>>
>>> This is a form of ETL where those result sets need to be posted
>>> somewhere. Say you create a table based on the result set prepared for each
>>> user. You may have many raw target tables at the end of the first ETL. How
>>> does this differ from using forEach? Performance wise forEach may not be
>>> optimal.
>>>
>>> Can you take the sample tables and try your method?
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
>>> marco.costant...@rocketfncl.com> wrote:
>>>
>>>> Hi Mich,
>>>> First, thank you for that. Great effort put into helping.
>>>>
>>>> Second, I don't think this tackles the technical challenge here. I
>>>> understand the windowing as it serves those ranks you created, but I don't
>>>> see how the ranks contribute to the solution.
>>>> Third, the core of the challenge is about performing this kind of
>>>> 'statement' but for all users. In this example we target Mich, but that
>>>> reduces the complexity by a lot! In fact, a simple join and filter would
>>>> solve that one.
>>>>
>>>> Any thoughts on that? For me, the foreach is desirable because I can
>>>> have the workers chain other actions to each iteration (send email, send
>>>> HTTP request, etc).
>>>>
>>>> Thanks Mich,
>>>> Marco.
>>>>
>>>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi Marco,
>>>>>
>>>>> First thoughts.
>>>>>
>>>>> foreach() is an action operation that is to iterate/loop over each
>>>>> element in the dataset, meaning cursor based. That is different from
>>>>> operating over the dataset as a set which is far more efficient.
>>>>>
>>>>> So in your case as I understand it correctly, you want to get order
>>>>> for each user (say Mich), convert the result set to json and send it to
>>>>> Mich via email
>>>>>
>>>>> Let us try this based on sample data
>>>>>
>>>>> Put your csv files into HDFS directory
>>>>>
>>>>> hdfs dfs -put users.csv /data/stg/test
>>>>> hdfs dfs -put orders.csv /data/stg/test
>>>>>
>>>>> Then create dataframes from csv files, create temp views and do a join
>>>>> on result sets with some slicing and dicing on orders table
>>>>>
>>>>> #! /usr/bin/env python3
>>>>> from __future__ import print_function
>>>>> import sys
>>>>> import findspark
>>>>> findspark.init()
>>>>> from pyspark.sql import SparkSession
>>>>> from pyspark import SparkContext
>>>>> from pyspark.sql import SQLContext, HiveContext
>>>>> from pyspark.sql.window import Window
>>>>>
>>>>> def spark_session(appName):
>>>>>   return SparkSession.builder \
>>>>>         .appName(appName) \
>>>>>         .enableHiveSupport() \
>>>>>         .getOrCreate()
>>>>>
>>>>> def main():
>>>>>     appName = "ORDERS"
>>>>>     spark =spark_session(appName)
>>>>>     # get the sample
>>>>>     users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>>>>>     orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>>>>>     users_df =
>>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>>>> "true").option("header", "true").load(users_file)
>>>>>     users_df.printSchema()
>>>>>     """
>>>>>     root
>>>>>     |-- id: integer (nullable = true)
>>>>>     |-- name: string (nullable = true)
>>>>>     """
>>>>>
>>>>>     print(f"""\n Reading from  {users_file}\n""")
>>>>>     users_df.show(5,False)
>>>>>     orders_df =
>>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>>>> "true").option("header", "true").load(orders_file)
>>>>>     orders_df.printSchema()
>>>>>     """
>>>>>     root
>>>>>     |-- id: integer (nullable = true)
>>>>>     |-- description: string (nullable = true)
>>>>>     |-- amount: double (nullable = true)
>>>>>     |-- user_id: integer (nullable = true)
>>>>>      """
>>>>>     print(f"""\n Reading from  {orders_file}\n""")
>>>>>     orders_df.show(50,False)
>>>>>     users_df.createOrReplaceTempView("users")
>>>>>     orders_df.createOrReplaceTempView("orders")
>>>>>     # Create a list of orders for each user
>>>>>     print(f"""\n Doing a join on two temp views\n""")
>>>>>
>>>>>     sqltext = """
>>>>>     SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
>>>>>     FROM
>>>>>     (
>>>>>     SELECT
>>>>>             user_id AS user_id
>>>>>         ,   id as order_id
>>>>>         ,   description as description
>>>>>         ,   amount AS amount
>>>>>         ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS
>>>>> RANK
>>>>>         ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS
>>>>> maxorders
>>>>>     FROM orders
>>>>>     ) t
>>>>>     INNER JOIN users u ON t.user_id = u.id
>>>>>     AND  u.name = 'Mich'
>>>>>     ORDER BY t.order_id
>>>>>     """
>>>>>     spark.sql(sqltext).show(50)
>>>>> if __name__ == '__main__':
>>>>>     main()
>>>>>
>>>>> Final outcome displaying orders for user Mich
>>>>>
>>>>> Doing a join on two temp views
>>>>>
>>>>>  Doing a join on two temp views
>>>>>
>>>>> +----+--------+-----------------+------+---------+
>>>>> |name|order_id|      description|amount|maxorders|
>>>>> +----+--------+-----------------+------+---------+
>>>>> |Mich|   50001| Mich's 1st order|101.11|   101.11|
>>>>> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
>>>>> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
>>>>> |Mich|   50004| Mich's 4th order|104.11|   104.11|
>>>>> |Mich|   50005| Mich's 5th order|105.11|   105.11|
>>>>> |Mich|   50006| Mich's 6th order|106.11|   106.11|
>>>>> |Mich|   50007| Mich's 7th order|107.11|   107.11|
>>>>> |Mich|   50008| Mich's 8th order|108.11|   108.11|
>>>>> |Mich|   50009| Mich's 9th order|109.11|   109.11|
>>>>> |Mich|   50010|Mich's 10th order|210.11|   210.11|
>>>>> +----+--------+-----------------+------+---------+
>>>>>
>>>>> You can start on this.  Happy coding
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Architect/Engineering Lead
>>>>> Palantir Technologies Limited
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 25 Apr 2023 at 18:50, Marco Costantini <
>>>>> marco.costant...@rocketfncl.com> wrote:
>>>>>
>>>>>> Thanks Mich,
>>>>>>
>>>>>> Great idea. I have done it. Those files are attached. I'm interested
>>>>>> to know your thoughts. Let's imagine this same structure, but with huge
>>>>>> amounts of data as well.
>>>>>>
>>>>>> Please and thank you,
>>>>>> Marco.
>>>>>>
>>>>>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Marco,
>>>>>>>
>>>>>>> Let us start simple,
>>>>>>>
>>>>>>> Provide a csv file of 5 rows for the users table. Each row has a
>>>>>>> unique user_id and one or two other columns like fictitious email etc.
>>>>>>>
>>>>>>> Also for each user_id, provide 10 rows of orders table, meaning that
>>>>>>> orders table has 5 x 10 rows for each user_id.
>>>>>>>
>>>>>>> both as comma separated csv file
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>> Palantir Technologies Limited
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>>>>>> marco.costant...@rocketfncl.com> wrote:
>>>>>>>
>>>>>>>> Thanks Mich,
>>>>>>>> I have not but I will certainly read up on this today.
>>>>>>>>
>>>>>>>> To your point that all of the essential data is in the 'orders'
>>>>>>>> table; I agree! That distills the problem nicely. Yet, I still have 
>>>>>>>> some
>>>>>>>> questions on which someone may be able to shed some light.
>>>>>>>>
>>>>>>>> 1) If my 'orders' table is very large, and will need to be
>>>>>>>> aggregated by 'user_id', how will Spark intelligently optimize on that
>>>>>>>> constraint (only read data for relevent 'user_id's). Is that something 
>>>>>>>> I
>>>>>>>> have to instruct Spark to do?
>>>>>>>>
>>>>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>>>>> search too much?
>>>>>>>>
>>>>>>>> Please, if you have any links to documentation I can read on *how*
>>>>>>>> Spark works under the hood for these operations, I would appreciate it 
>>>>>>>> if
>>>>>>>> you give them. Spark has become a pillar on my team and knowing it in 
>>>>>>>> more
>>>>>>>> detail is warranted.
>>>>>>>>
>>>>>>>> Slightly pivoting the subject here; I have tried something. It was
>>>>>>>> a suggestion by an AI chat bot and it seemed reasonable. In my main 
>>>>>>>> Spark
>>>>>>>> script I now have the line:
>>>>>>>>
>>>>>>>> ```
>>>>>>>> grouped_orders_df =
>>>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>>>>> ```
>>>>>>>> (json is ultimately needed)
>>>>>>>>
>>>>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>>>>> single Array column. Now my worry is, will this column become too 
>>>>>>>> large if
>>>>>>>> there are a great many orders. Is there a limit? I have search for
>>>>>>>> documentation on such a limit but could not find any.
>>>>>>>>
>>>>>>>> I truly appreciate your help Mich and team,
>>>>>>>> Marco.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Have you thought of using  windowing function
>>>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>>>>> achieve this?
>>>>>>>>>
>>>>>>>>> Effectively all your information is in the orders table.
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>> Mich Talebzadeh,
>>>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>>>> Palantir Technologies Limited
>>>>>>>>> London
>>>>>>>>> United Kingdom
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    view my Linkedin profile
>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property 
>>>>>>>>> which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>>>> damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>>>>>> marco.costant...@rocketfncl.com> wrote:
>>>>>>>>>
>>>>>>>>>> I have two tables: {users, orders}. In this example, let's say
>>>>>>>>>> that for each 1 User in the users table, there are 100000 Orders in 
>>>>>>>>>> the
>>>>>>>>>> orders table.
>>>>>>>>>>
>>>>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>>>>> User. So, a single user will need his/her own list of Orders. 
>>>>>>>>>> Additionally,
>>>>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>>>>> example).
>>>>>>>>>>
>>>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the
>>>>>>>>>> users DataFrame. This way, I can rely on the spark workers to handle 
>>>>>>>>>> the
>>>>>>>>>> email sending individually. However, I now do not know the best way 
>>>>>>>>>> to get
>>>>>>>>>> each User's Orders.
>>>>>>>>>>
>>>>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> users_df = <my entire users DataFrame>
>>>>>>>>>> orders_df = <my entire orders DataFrame>
>>>>>>>>>>
>>>>>>>>>> #this is poorly named for max understandability in this context
>>>>>>>>>> def foreach_function(row):
>>>>>>>>>>   user_id = row.user_id
>>>>>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>>>>>
>>>>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>>>>
>>>>>>>>>> users_df.foreach(foreach_function)
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>>>>> foreach function, I will capitalize on Spark's scalability when 
>>>>>>>>>> doing that
>>>>>>>>>> work. However, I am worried of two things:
>>>>>>>>>>
>>>>>>>>>> If I take all Orders up front...
>>>>>>>>>>
>>>>>>>>>> Will that work?
>>>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions
>>>>>>>>>> who won't handle them (different User).
>>>>>>>>>>
>>>>>>>>>> If I create the orders_df (filtered) within the foreach
>>>>>>>>>> function...
>>>>>>>>>>
>>>>>>>>>> Will it work?
>>>>>>>>>> Will that be too much IO to DB?
>>>>>>>>>>
>>>>>>>>>> The question ultimately is: How can I achieve this goal
>>>>>>>>>> efficiently?
>>>>>>>>>>
>>>>>>>>>> I have not yet tried anything here. I am doing so as we speak,
>>>>>>>>>> but am suffering from choice-paralysis.
>>>>>>>>>>
>>>>>>>>>> Please and thank you.
>>>>>>>>>>
>>>>>>>>> --
>> Best Regards,
>> Ayan Guha
>>
> --
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Reply via email to