and I use .withColumn("window", window(current_timestamp(), "15
minutes")) after
count

张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道:

> *Yes, my code is shown below*
> /**
>     * input
>     */
>   val logs = spark
>     .readStream
>     .format("kafka")
>     .option("kafka.bootstrap.servers", BROKER_SERVER)
>     .option("subscribe", TOPIC)
>     .option("startingOffset", "latest")
>     .load()
>
>   /**
>     * process
>     */
>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>
>   val events = logValues
>     .map(parseFunction)
>     .select(
>       $"_1".alias("date").cast("timestamp"),
>       $"_2".alias("uuid").cast("string")
>     )
>
>   val results = events
>     .withWatermark("date", "1 day")
>     .dropDuplicates("uuid", "date")
>     .groupBy($"date")
>     .count()
>     .withColumn("window", window(current_timestamp(), "15 minutes"))
>
>   /**
>     * output
>     */
>   val query = results
>     .writeStream
>     .outputMode("update")
>     .format("console")
>     .option("truncate", "false")
>     .trigger(Trigger.ProcessingTime("1 seconds"))
>     .start()
>
>   query.awaitTermination()
>
> *and I use play json to parse input logs from kafka ,the parse function is
> like*
>
>   def parseFunction(str: String): (Long, String) = {
>     val json = Json.parse(str)
>     val timestamp = (json \ "time").get.toString().toLong
>     val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>     val uuid = (json \ "uuid").get.toString()
>     (date, uuid)
>   }
>
> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道:
>
>> Can you show all the code?  This works for me.
>>
>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>
>>> The spark version is 2.2.0
>>>
>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:
>>>
>>>> Which version of spark?
>>>>
>>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>
>>>>> Thanks for reply, but using this method I got an exception:
>>>>>
>>>>> "Exception in thread "main"
>>>>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
>>>>> expressions are only allowed in
>>>>>
>>>>> Project, Filter, Aggregate or Window"
>>>>>
>>>>> Can you give more advice?
>>>>>
>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>>>>
>>>>>> import org.apache.spark.sql.functions._
>>>>>>
>>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>>>
>>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> In structured streaming how can I add a column to a dataset with
>>>>>>> current system time aligned with 15 minutes?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>

Reply via email to