Hi :
   I have some questions about spark structured streaming window output  in 
spark 2.3.1.  I write the application code as following:

case class DataType(time:Timestamp, value:Long) {}

val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[1]")
      .getOrCreate()
 
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to 
localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].map(l => {
  var tmp = l.split(",")
  DataType(Timestamp.valueOf(tmp(0)), tmp(1).toLong)
}).as[DataType]

val windowedCounts = words
      .withWatermark("time", "1 minutes")
      .groupBy(window($"time", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")

val query = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

the input data format is :
2018-08-20 12:01:00,1
2018-08-20 12:02:01,1

My questions are:
1、when I set the append output model,  I send inputdata, but there is no result 
to output. How to use append model in window aggreate case ?
2、when I set the update output model, I send inputdata, the result is output 
every batch .But I want output the result only once when window is end. How can 
I do?

Thanks in advance!




z...@zjdex.com

Reply via email to