Re: Generate windows on processing time in Spark Structured Streaming

2017-11-10 Thread Michael Armbrust
Hmmm, we should allow that.  current_timestamp() is acutally deterministic
within any given batch.  Could you open a JIRA ticket?

On Fri, Nov 10, 2017 at 1:52 AM, wangsan  wrote:

> Hi all,
>
> How can I use current processing time to generate windows in streaming
> processing?
> *window* function's Scala doc says "For a streaming query, you may use
> the function current_timestamp to generate windows on processing time.”
>  But when using current_timestamp as column in window function, exceptions
> occurred.
>
> Here are my code:
>
> val socketDF = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", )
>   .load()
>
> socketDF.createOrReplaceTempView("words")
> val windowedCounts = spark.sql(
>   """
> |SELECT value as word, current_timestamp() as time, count(1) as count 
> FROM words
> |   GROUP BY window(time, "5 seconds"), word
>   """.stripMargin)
>
> windowedCounts
>   .writeStream
>   .outputMode("complete")
>   .format("console")
>   .start()
>   .awaitTermination()
>
> And here are Exception Info:
> *Caused by: org.apache.spark.sql.AnalysisException: nondeterministic
> expressions are only allowed in*
> *Project, Filter, Aggregate or Window, *found:
>
>
>
>


Generate windows on processing time in Spark Structured Streaming

2017-11-10 Thread wangsan
Hi all,


How can I use current processing time to generate windows in streaming 
processing? 
window function's Scala doc says "For a streaming query, you may use the 
function current_timestamp to generate windows on processing time.”  But when 
using current_timestamp as column in window function, exceptions occurred.


Here are my code:
val socketDF = spark.readStream
  .format("socket")
.option("host", "localhost")
.option("port", )
.load()

socketDF.createOrReplaceTempView("words")
val windowedCounts = spark.sql(
"""
|SELECT value as word, current_timestamp() as time, count(1) as count FROM 
words
|   GROUP BY window(time, "5 seconds"), word
  """.stripMargin)

windowedCounts
  .writeStream
  .outputMode("complete")
.format("console")
.start()
.awaitTermination()
And here are Exception Info:
Caused by: org.apache.spark.sql.AnalysisException: nondeterministic expressions 
are only allowed in
Project, Filter, Aggregate or Window, found: