Which version do you use? Above app works with Spark 2.3.1, 200 partitions are stored for State.
val queryStatusFile = conf.queryStatusFile() val rateRowPerSecond = conf.rateRowPerSecond() val rateRampUpTimeSecond = conf.rateRampUpTimeSecond() val ss = SparkSession .builder() .master("local[3]") .appName("state coalesce test") .getOrCreate() ss.streams.addListener(new QueryListenerWriteProgressToFile(queryStatusFile)) import ss.implicits._ val df = ss.readStream .format("rate") .option("rowsPerSecond", rateRowPerSecond) .option("rampUpTime", s"${rateRampUpTimeSecond}s") .load() df.printSchema() val outDf = df.withWatermark("timestamp", "10 seconds") .selectExpr( "timestamp", "mod(value, 100) as mod", "value", BenchmarkQueryHelper.createCaseExprStr( "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)", 50, 10) + " as word") .groupBy( window($"timestamp", "1 minute", "10 seconds"), $"mod", $"word") .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")) .coalesce(8) val query = outDf.writeStream .format("memory") .option("queryName", "stateCoalesceTest") .option("checkpointLocation", "/tmp/state-coalesce-test") .trigger(Trigger.ProcessingTime("5 seconds")) .outputMode(OutputMode.Update()) .start() query.awaitTermination() -Jungtaek Lim (HeartSaVioR) 2018년 8월 9일 (목) 오후 8:38, WangXiaolong <roland8...@163.com>님이 작성: > Hi, > > Lately, I encountered a problem, when I was writing as structured > streaming job to write things into opentsdb. > The write-stream part looks something like > > outputDs > .coalesce(14) > .writeStream > .outputMode("append") > .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds")) > .option("checkpointLocation",s"$checkpointDir/$appName/tsdb") > .foreach { > TsdbWriter( > tsdbUrl, > MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword, > mongoDatabase, mongoCollection,mongoAuthenticationDatabase) > )(createMetricBuilder(tsdbMetricPrefix)) > } > .start() > > And when I check the checkpoint dir, I discover that the > "/checkpoint/state" dir is empty. I looked into the executor's log and > found that the HDFSBackedStateStoreProvider didn't write anything on the > checkpoint dir. > > Strange thing is, when I replace the "coalesce" function into > "repartition" function, the problem solved. Is there a difference between > these two functions when using structured streaming? > > Looking forward to you help, thanks. > > > > >