Great Amit, best of luck

Cheers,

Mich



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Jun 2021 at 18:39, Amit Joshi <mailtojoshia...@gmail.com> wrote:

> HI Mich,
>
> Thanks for your email.
> I have tried for the batch mode,
> Still looking to try in streaming mode.
> Will update you as per.
>
>
> Regards
> Amit Joshi
>
> On Thu, Jun 17, 2021 at 1:07 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> OK let us start with the basic cube
>>
>> create a DF first
>>
>> scala> val df = Seq(
>>      |   ("bar", 2L),
>>      |   ("bar", 2L),
>>      |   ("foo", 1L),
>>      |   ("foo", 2L)
>>      | ).toDF("word", "num")
>> df: org.apache.spark.sql.DataFrame = [word: string, num: bigint]
>>
>>
>> Now try cube on it
>>
>>
>> scala> df.cube($"word", $"num").count.sort(asc("word"), asc("num")).show
>>
>> +----+----+-----+
>> |word| num|count|
>> +----+----+-----+
>> |null|null|    4| Total rows in df
>> |null|   1|    1| Count where num equals 1
>> |null|   2|    3| Count where num equals 2
>> | bar|null|    2| Where word equals bar
>> | bar|   2|    2| Where word equals bar and num equals 2
>> | foo|null|    2| Where word equals foo
>> | foo|   1|    1| Where word equals foo and num equals 1
>> | foo|   2|    1| Where word equals foo and num equals 2
>> +----+----+-----+
>>
>>
>> and rollup
>>
>>
>> scala> df.rollup($"word",$"num").count.sort(asc("word"), asc("num")).show
>>
>>
>> +----+----+-----+
>> |word| num|count|
>> +----+----+-----+
>> |null|null|    4| Count of all rows
>> | bar|null|    2| Count when word is bar
>> | bar|   2|    2| Count when num is 2
>> | foo|null|    2| Count when word is foo
>> | foo|   1|    1| When word is foo and num is 1
>> | foo|   2|    1| When word is foo and num is 2
>> +----+----+-----+
>>
>>
>> So rollup() returns a subset of the rows returned by cube(). From the
>> above, rollup returns 6 rows whereas cube returns 8 rows. Here are the
>> missing rows.
>>
>> +----+----+-----+
>> |word| num|count|
>> +----+----+-----+
>> |null|   1|    1| Word is null and num is 1
>> |null|   2|    3| Word is null and num is 2
>> +----+----+-----+
>>
>> Now back to Spark Structured Streaming (SSS), we have basic aggregations
>>
>>
>>             """
>>             We work out the window and the AVG(temperature) in the
>> window's timeframe below
>>             This should return back the following Dataframe as struct
>>
>>              root
>>              |-- window: struct (nullable = false)
>>              |    |-- start: timestamp (nullable = true)
>>              |    |-- end: timestamp (nullable = true)
>>              |-- avg(temperature): double (nullable = true)
>>
>>             """
>>             resultM = resultC. \
>>                      withWatermark("timestamp", "5 minutes"). \
>>                      groupBy(window(resultC.timestamp, "5 minutes", "5
>> minutes")). \
>>                      avg('temperature')
>>
>>             # We take the above Dataframe and flatten it to get the
>> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
>> "AVGTemperature"
>>             resultMF = resultM. \
>>                        select( \
>>
>> F.col("window.start").alias("startOfWindowFrame") \
>>                           , F.col("window.end").alias("endOfWindowFrame")
>> \
>>                           ,
>> F.col("avg(temperature)").alias("AVGTemperature"))
>>
>> Now basic aggregation on singular columns can be done like
>> avg('temperature'),max(),stddev() etc
>>
>>
>> For cube() and rollup() I will require additional columns like location
>> etc in my kafka topic. Personally I have not tried it but it will be
>> interesting to see if it works.
>>
>>
>> Have you tried cube() first?
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 17 Jun 2021 at 07:44, Amit Joshi <mailtojoshia...@gmail.com>
>> wrote:
>>
>>> Hi Mich,
>>>
>>> Yes, you may think of cube rollups.
>>> Let me try to give an example:
>>> If we have a stream of data like (country,area,count, time), we would be
>>> able to get the updated count with different combinations of keys.
>>>
>>>> As example -
>>>>  (country - count)
>>>>  (country , area - count)
>>>
>>>
>>> We may need to store the state to update the count. So spark structured
>>> streaming states will come into picture.
>>>
>>> As now with batch programming, we can do it with
>>>
>>>>     df.rollup(col1,col2).count
>>>
>>>
>>> But if I try to use it with spark structured streaming state, will it
>>> store the state of all the groups as well?
>>> I hope I was able to make my point clear.
>>>
>>> Regards
>>> Amit Joshi
>>>
>>> On Wed, Jun 16, 2021 at 11:36 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> Hi,
>>>>
>>>> Just to clarify
>>>>
>>>> Are we talking about* rollup* as a subset of a cube that computes
>>>> hierarchical subtotals from left to right?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 16 Jun 2021 at 16:37, Amit Joshi <mailtojoshia...@gmail.com>
>>>> wrote:
>>>>
>>>>> Appreciate if someone could give some pointers in the question below.
>>>>>
>>>>> ---------- Forwarded message ---------
>>>>> From: Amit Joshi <mailtojoshia...@gmail.com>
>>>>> Date: Tue, Jun 15, 2021 at 12:19 PM
>>>>> Subject: [Spark]Does Rollups work with spark structured streaming with
>>>>> state.
>>>>> To: spark-user <user@spark.apache.org>
>>>>>
>>>>>
>>>>> Hi Spark-Users,
>>>>>
>>>>> Hope you are all doing well.
>>>>> Recently I was looking into rollup operations in spark.
>>>>>
>>>>> As we know state based aggregation is supported in spark structured
>>>>> streaming.
>>>>> I was wondering if rollup operations are also supported?
>>>>> Like the state of previous aggregation on the rollups are saved.
>>>>>
>>>>> If rollups are not supported, then what is the standard way to handle
>>>>> this?
>>>>>
>>>>>
>>>>> Regards
>>>>> Amit Joshi
>>>>>
>>>>

Reply via email to