HI Mich, Thanks for your email. I have tried for the batch mode, Still looking to try in streaming mode. Will update you as per.
Regards Amit Joshi On Thu, Jun 17, 2021 at 1:07 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > OK let us start with the basic cube > > create a DF first > > scala> val df = Seq( > | ("bar", 2L), > | ("bar", 2L), > | ("foo", 1L), > | ("foo", 2L) > | ).toDF("word", "num") > df: org.apache.spark.sql.DataFrame = [word: string, num: bigint] > > > Now try cube on it > > > scala> df.cube($"word", $"num").count.sort(asc("word"), asc("num")).show > > +----+----+-----+ > |word| num|count| > +----+----+-----+ > |null|null| 4| Total rows in df > |null| 1| 1| Count where num equals 1 > |null| 2| 3| Count where num equals 2 > | bar|null| 2| Where word equals bar > | bar| 2| 2| Where word equals bar and num equals 2 > | foo|null| 2| Where word equals foo > | foo| 1| 1| Where word equals foo and num equals 1 > | foo| 2| 1| Where word equals foo and num equals 2 > +----+----+-----+ > > > and rollup > > > scala> df.rollup($"word",$"num").count.sort(asc("word"), asc("num")).show > > > +----+----+-----+ > |word| num|count| > +----+----+-----+ > |null|null| 4| Count of all rows > | bar|null| 2| Count when word is bar > | bar| 2| 2| Count when num is 2 > | foo|null| 2| Count when word is foo > | foo| 1| 1| When word is foo and num is 1 > | foo| 2| 1| When word is foo and num is 2 > +----+----+-----+ > > > So rollup() returns a subset of the rows returned by cube(). From the > above, rollup returns 6 rows whereas cube returns 8 rows. Here are the > missing rows. > > +----+----+-----+ > |word| num|count| > +----+----+-----+ > |null| 1| 1| Word is null and num is 1 > |null| 2| 3| Word is null and num is 2 > +----+----+-----+ > > Now back to Spark Structured Streaming (SSS), we have basic aggregations > > > """ > We work out the window and the AVG(temperature) in the > window's timeframe below > This should return back the following Dataframe as struct > > root > |-- window: struct (nullable = false) > | |-- start: timestamp (nullable = true) > | |-- end: timestamp (nullable = true) > |-- avg(temperature): double (nullable = true) > > """ > resultM = resultC. \ > withWatermark("timestamp", "5 minutes"). \ > groupBy(window(resultC.timestamp, "5 minutes", "5 > minutes")). \ > avg('temperature') > > # We take the above Dataframe and flatten it to get the > columns aliased as "startOfWindowFrame", "endOfWindowFrame" and > "AVGTemperature" > resultMF = resultM. \ > select( \ > > F.col("window.start").alias("startOfWindowFrame") \ > , F.col("window.end").alias("endOfWindowFrame") \ > , > F.col("avg(temperature)").alias("AVGTemperature")) > > Now basic aggregation on singular columns can be done like > avg('temperature'),max(),stddev() etc > > > For cube() and rollup() I will require additional columns like location > etc in my kafka topic. Personally I have not tried it but it will be > interesting to see if it works. > > > Have you tried cube() first? > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 17 Jun 2021 at 07:44, Amit Joshi <mailtojoshia...@gmail.com> > wrote: > >> Hi Mich, >> >> Yes, you may think of cube rollups. >> Let me try to give an example: >> If we have a stream of data like (country,area,count, time), we would be >> able to get the updated count with different combinations of keys. >> >>> As example - >>> (country - count) >>> (country , area - count) >> >> >> We may need to store the state to update the count. So spark structured >> streaming states will come into picture. >> >> As now with batch programming, we can do it with >> >>> df.rollup(col1,col2).count >> >> >> But if I try to use it with spark structured streaming state, will it >> store the state of all the groups as well? >> I hope I was able to make my point clear. >> >> Regards >> Amit Joshi >> >> On Wed, Jun 16, 2021 at 11:36 PM Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> >>> >>> Hi, >>> >>> Just to clarify >>> >>> Are we talking about* rollup* as a subset of a cube that computes >>> hierarchical subtotals from left to right? >>> >>> >>> >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Wed, 16 Jun 2021 at 16:37, Amit Joshi <mailtojoshia...@gmail.com> >>> wrote: >>> >>>> Appreciate if someone could give some pointers in the question below. >>>> >>>> ---------- Forwarded message --------- >>>> From: Amit Joshi <mailtojoshia...@gmail.com> >>>> Date: Tue, Jun 15, 2021 at 12:19 PM >>>> Subject: [Spark]Does Rollups work with spark structured streaming with >>>> state. >>>> To: spark-user <user@spark.apache.org> >>>> >>>> >>>> Hi Spark-Users, >>>> >>>> Hope you are all doing well. >>>> Recently I was looking into rollup operations in spark. >>>> >>>> As we know state based aggregation is supported in spark structured >>>> streaming. >>>> I was wondering if rollup operations are also supported? >>>> Like the state of previous aggregation on the rollups are saved. >>>> >>>> If rollups are not supported, then what is the standard way to handle >>>> this? >>>> >>>> >>>> Regards >>>> Amit Joshi >>>> >>>