I have a scenario in which we need to calculate 'charges' for a stream of
events which has the following details:
1. Event contains eventTime, facet, units
2. Free quantity per facet that needs to offset the earliest events based on
the eventTime
3. Prices are also specified per facet
4. Events that arrive in a single minute can be considered equivalent (for
reduced state maintenance) and all of them need to have free units
proportionally distributed
I was hoping to make it the work in the following manner using spark structured
streaming
1. Aggregate events at a minute level per facet using the window function per
facet
2. Join with the price and free quantity
3. Group by facet
4. flatMapGroup by facet to then sort the aggregation by window start time,
apply the results
what I am noticing is that the output of #4 is just the aggregation for which
new events came in and not all the aggregation since the watermark.
**Qn: How can I fix this code to get all aggregation since the watermark or
from a previous wh?**
Could someone help? A sample code is below…
Thanks
---------------
package test
import java.sql.Timestamp
import java.util.UUID
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout,
OutputMode, Trigger}
import org.apache.spark.streaming.{Seconds, StreamingContext}
case class UsageEvent(Id: String, facetId: String, Units: Double,
timeStamp:Timestamp)
case class FacetPricePoints(facetId: String, Price: Double, FreeUnits: Double)
case class UsageBlock(facetId: String, start:Timestamp, Units: Double)
case class UsageBlockWithPrice(facetId: String, start:Timestamp, Units: Double,
Price: Double, FreeUnits: Double)
case class UsageBlockWithCharge(facetId: String, start:Timestamp, Units:
Double, Price: Double, FreeUnits: Double, ChargedUnits: Double, Charge: Double)
object TestProcessing {
def getUsageEventStream(ts: Timestamp, units: String) : UsageEvent =
{UsageEvent(UUID.randomUUID().toString, "F1", units.toInt % 20, ts)}
implicit def ordered: Ordering[Timestamp] = new Ordering[Timestamp] {def
compare(x: Timestamp, y: Timestamp): Int = x compareTo y}
def ChargeUsageBlock(Key: String, Value: Iterator[UsageBlockWithPrice]) :
Iterator[UsageBlockWithCharge] =
{
val usageBlocks = Value.toList.sortBy(ub => ub.start)
var freeUnits = 0.0
var freeUnitsSet = false
var newUe = for (ue <- usageBlocks)
yield {
freeUnits = if (!freeUnitsSet) ue.FreeUnits else freeUnits
freeUnitsSet = true
val freeUnitsInBlock = if (freeUnits > ue.Units) ue.Units else freeUnits
val chargedUnits = ue.Units - freeUnitsInBlock
freeUnits -= freeUnitsInBlock // todo: need to specify precision and
rounding
UsageBlockWithCharge(ue.facetId, ue.start, ue.Units, ue.Price,
freeUnitsInBlock, chargedUnits, chargedUnits * ue.Price)
}
newUe.iterator
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("Test").getOrCreate()
val stream = spark.readStream.format("rate").option("rowsPerSecond",
1).load()
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
val prices = ssc.sparkContext.parallelize(List( FacetPricePoints("F1",
30.0, 100.0))).toDF()
val getUsageEventStreamUDF = udf((ts: Timestamp, units: String) =>
getUsageEventStream(ts, units)) // .where($"Units" < 2).
val usageEventsRaw = stream.withColumn("Usage",
getUsageEventStreamUDF(stream("timestamp"),
stream("value"))).select("Usage.*").as[UsageEvent].dropDuplicates("Id").withWatermark("timeStamp",
"1 hour")
val aggUsage = usageEventsRaw.groupBy($"facetId", window($"timeStamp", "1
minute")).agg(sum($"Units") as "Units").selectExpr("facetId", "window.start",
"Units").as[UsageBlock]
val fifoRate = (Key: String, Value: Iterator[UsageBlockWithPrice]) => {
ChargeUsageBlock(Key, Value) }
val aggUsageCharge = aggUsage.joinWith(prices, prices.col("facetId") ===
usageEventsRaw.col("facetId")).select("_1.*", "_2.Price",
"_2.FreeUnits").as[UsageBlockWithPrice].groupByKey(x =>
x.facetId).flatMapGroups(fifoRate).withWatermark("start", "1 hour")
val fin = aggUsageCharge.writeStream.trigger((Trigger.ProcessingTime("10
seconds"))).outputMode(OutputMode.Update).format("console").start()
// this applies freeUnits for every minute instead of just
applying it once
fin.awaitTermination()
ssc.start()
ssc.awaitTermination()
}
}