HI Mich,

Thanks for your email.
I have tried for the batch mode,
Still looking to try in streaming mode.
Will update you as per.


Regards
Amit Joshi

On Thu, Jun 17, 2021 at 1:07 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> OK let us start with the basic cube
>
> create a DF first
>
> scala> val df = Seq(
>      |   ("bar", 2L),
>      |   ("bar", 2L),
>      |   ("foo", 1L),
>      |   ("foo", 2L)
>      | ).toDF("word", "num")
> df: org.apache.spark.sql.DataFrame = [word: string, num: bigint]
>
>
> Now try cube on it
>
>
> scala> df.cube($"word", $"num").count.sort(asc("word"), asc("num")).show
>
> +----+----+-----+
> |word| num|count|
> +----+----+-----+
> |null|null|    4| Total rows in df
> |null|   1|    1| Count where num equals 1
> |null|   2|    3| Count where num equals 2
> | bar|null|    2| Where word equals bar
> | bar|   2|    2| Where word equals bar and num equals 2
> | foo|null|    2| Where word equals foo
> | foo|   1|    1| Where word equals foo and num equals 1
> | foo|   2|    1| Where word equals foo and num equals 2
> +----+----+-----+
>
>
> and rollup
>
>
> scala> df.rollup($"word",$"num").count.sort(asc("word"), asc("num")).show
>
>
> +----+----+-----+
> |word| num|count|
> +----+----+-----+
> |null|null|    4| Count of all rows
> | bar|null|    2| Count when word is bar
> | bar|   2|    2| Count when num is 2
> | foo|null|    2| Count when word is foo
> | foo|   1|    1| When word is foo and num is 1
> | foo|   2|    1| When word is foo and num is 2
> +----+----+-----+
>
>
> So rollup() returns a subset of the rows returned by cube(). From the
> above, rollup returns 6 rows whereas cube returns 8 rows. Here are the
> missing rows.
>
> +----+----+-----+
> |word| num|count|
> +----+----+-----+
> |null|   1|    1| Word is null and num is 1
> |null|   2|    3| Word is null and num is 2
> +----+----+-----+
>
> Now back to Spark Structured Streaming (SSS), we have basic aggregations
>
>
>             """
>             We work out the window and the AVG(temperature) in the
> window's timeframe below
>             This should return back the following Dataframe as struct
>
>              root
>              |-- window: struct (nullable = false)
>              |    |-- start: timestamp (nullable = true)
>              |    |-- end: timestamp (nullable = true)
>              |-- avg(temperature): double (nullable = true)
>
>             """
>             resultM = resultC. \
>                      withWatermark("timestamp", "5 minutes"). \
>                      groupBy(window(resultC.timestamp, "5 minutes", "5
> minutes")). \
>                      avg('temperature')
>
>             # We take the above Dataframe and flatten it to get the
> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
> "AVGTemperature"
>             resultMF = resultM. \
>                        select( \
>
> F.col("window.start").alias("startOfWindowFrame") \
>                           , F.col("window.end").alias("endOfWindowFrame") \
>                           ,
> F.col("avg(temperature)").alias("AVGTemperature"))
>
> Now basic aggregation on singular columns can be done like
> avg('temperature'),max(),stddev() etc
>
>
> For cube() and rollup() I will require additional columns like location
> etc in my kafka topic. Personally I have not tried it but it will be
> interesting to see if it works.
>
>
> Have you tried cube() first?
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 17 Jun 2021 at 07:44, Amit Joshi <mailtojoshia...@gmail.com>
> wrote:
>
>> Hi Mich,
>>
>> Yes, you may think of cube rollups.
>> Let me try to give an example:
>> If we have a stream of data like (country,area,count, time), we would be
>> able to get the updated count with different combinations of keys.
>>
>>> As example -
>>>  (country - count)
>>>  (country , area - count)
>>
>>
>> We may need to store the state to update the count. So spark structured
>> streaming states will come into picture.
>>
>> As now with batch programming, we can do it with
>>
>>>     df.rollup(col1,col2).count
>>
>>
>> But if I try to use it with spark structured streaming state, will it
>> store the state of all the groups as well?
>> I hope I was able to make my point clear.
>>
>> Regards
>> Amit Joshi
>>
>> On Wed, Jun 16, 2021 at 11:36 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>> Just to clarify
>>>
>>> Are we talking about* rollup* as a subset of a cube that computes
>>> hierarchical subtotals from left to right?
>>>
>>>
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 16 Jun 2021 at 16:37, Amit Joshi <mailtojoshia...@gmail.com>
>>> wrote:
>>>
>>>> Appreciate if someone could give some pointers in the question below.
>>>>
>>>> ---------- Forwarded message ---------
>>>> From: Amit Joshi <mailtojoshia...@gmail.com>
>>>> Date: Tue, Jun 15, 2021 at 12:19 PM
>>>> Subject: [Spark]Does Rollups work with spark structured streaming with
>>>> state.
>>>> To: spark-user <user@spark.apache.org>
>>>>
>>>>
>>>> Hi Spark-Users,
>>>>
>>>> Hope you are all doing well.
>>>> Recently I was looking into rollup operations in spark.
>>>>
>>>> As we know state based aggregation is supported in spark structured
>>>> streaming.
>>>> I was wondering if rollup operations are also supported?
>>>> Like the state of previous aggregation on the rollups are saved.
>>>>
>>>> If rollups are not supported, then what is the standard way to handle
>>>> this?
>>>>
>>>>
>>>> Regards
>>>> Amit Joshi
>>>>
>>>

Reply via email to