Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread kant kodali
That makes a lot of sense now! I am looking for Tumbling window so my
window interval and batch interval is 24 hours. Every day I want to start
with a fresh state.
Finally, Since you said I need to do book keeping of "last 24 hours" ? Do
you mean I need to do this some external store and then compute Max? unless
I store history in some external store I am not seeing a way to retrieve
all history especially when GroupState.get() seems to return only the most
recent updated state but not the entire history.


On Wed, Aug 30, 2017 at 4:09 PM, Tathagata Das 
wrote:

> The per-key state S is kept in the memory. It has to be of a type that can
> be encoded by Datasets. All you have to do is update S every time the
> function is called, and the engine takes care of serializing/checkpointing
> the state value, and retrieving the correct version of the value when
> restarting from failures. So you explicitly don't have to "store" the state
> anywhere, the engine takes care of it under the hood. Internally, there is
> an interface called StateStore
> ,
> which defines a component who is actually responsible for checkpointing the
> values, etc. And there is a single implementation
> 
> of the store that keeps the values in a hashmap and writes all changes to
> the values to a HDFS-API-compatible fault-tolerant filesystem for
> checkpointing. With this, by default, you really don't have to worry about
> externalizing it and you don't have overload any thing in GroupState. You
> just use it as the example shows.
>
> It's important to note that all the state of all the keys is distributed
> over the executors. So each executor will have in its memory, a fraction of
> the all the train state. Depending on the number of trains, and the amount
> of data in the state, you will have to size the cluster and the workers
> accordingly. If you keep a lot of state for each train, then your overall
> memory requirements are going to increase. So you have to be judicious
> about how much data to keep as state data for each key.
>
> Regarding aggregation vs mapGroupsWithState, it's a trade-off between
> efficiency and flexibility. With aggregation, you can do sliding window of
> "24 hours" sliding every "1 hour", which will give max in "last 24 hours"
> updated every "1 hour". If you are okay with this approximation, then this
> is easiest to implement (don't forget setting watermarks) and most
> efficient. If you really want something more precise than that, then
> mapGroupsWithState is the ultimate flexible tool. However, you have to do
> bookkeeping of "last 24 hours" and calculate the max yourself. :)
>
> Hope this helps.
>
> On Wed, Aug 30, 2017 at 10:58 AM, kant kodali  wrote:
>
>> I think I understand *groupByKey/**mapGroupsWithState *and I am still
>> trying to wrap my head around *GroupState*. so, I believe I have a
>> naive questions to ask on *GroupState*.
>>
>> If I were to represent a state that has history of events (say 24 hours)
>> and say the number of events can be big for a given 24 hour period. where
>> do I store the state S? An external store like Kafka or a Database or a
>> Distributed File system ? I wonder if I can represent the state S using a
>> DataSet that represents the history of events? GroupState also has
>> .exists() and  .get() and if I am not wrong I should override these methods
>> right so comparisons and retrieval from external store can work?
>>
>> Thanks!
>>
>>
>>
>> On Wed, Aug 30, 2017 at 1:39 AM, kant kodali  wrote:
>>
>>> Hi TD,
>>>
>>> Thanks for the explanation and for the clear pseudo code and an example!
>>>
>>> mapGroupsWithState is cool and looks very flexible however I have few
>>> concerns and questions. For example
>>>
>>> Say I store TrainHistory as max heap from the Java Collections library
>>> and I keep adding to to this heap for 24 hours and at some point I will run
>>> out of Java heap space right? Do I need to store TrainHistory as a
>>> DataSet or DataFrame instead of in memory max heap object from Java
>>> Collections library?
>>>
>>> I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState*
>>> which approach is more efficient to solve this particular problem ?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Aah, I might have misinterpreted. The groupBy + window solution would
 give the max time for each train over 24 hours (non-overlapping window) of
 event data (timestamped by activity_timestamp). So the output would be
 like.

 Train Dest   Window(activity_timestamp)max(Time)
 1 

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread Tathagata Das
The per-key state S is kept in the memory. It has to be of a type that can
be encoded by Datasets. All you have to do is update S every time the
function is called, and the engine takes care of serializing/checkpointing
the state value, and retrieving the correct version of the value when
restarting from failures. So you explicitly don't have to "store" the state
anywhere, the engine takes care of it under the hood. Internally, there is
an interface called StateStore
,
which defines a component who is actually responsible for checkpointing the
values, etc. And there is a single implementation

of the store that keeps the values in a hashmap and writes all changes to
the values to a HDFS-API-compatible fault-tolerant filesystem for
checkpointing. With this, by default, you really don't have to worry about
externalizing it and you don't have overload any thing in GroupState. You
just use it as the example shows.

It's important to note that all the state of all the keys is distributed
over the executors. So each executor will have in its memory, a fraction of
the all the train state. Depending on the number of trains, and the amount
of data in the state, you will have to size the cluster and the workers
accordingly. If you keep a lot of state for each train, then your overall
memory requirements are going to increase. So you have to be judicious
about how much data to keep as state data for each key.

Regarding aggregation vs mapGroupsWithState, it's a trade-off between
efficiency and flexibility. With aggregation, you can do sliding window of
"24 hours" sliding every "1 hour", which will give max in "last 24 hours"
updated every "1 hour". If you are okay with this approximation, then this
is easiest to implement (don't forget setting watermarks) and most
efficient. If you really want something more precise than that, then
mapGroupsWithState is the ultimate flexible tool. However, you have to do
bookkeeping of "last 24 hours" and calculate the max yourself. :)

Hope this helps.

On Wed, Aug 30, 2017 at 10:58 AM, kant kodali  wrote:

> I think I understand *groupByKey/**mapGroupsWithState *and I am still
> trying to wrap my head around *GroupState*. so, I believe I have a
> naive questions to ask on *GroupState*.
>
> If I were to represent a state that has history of events (say 24 hours)
> and say the number of events can be big for a given 24 hour period. where
> do I store the state S? An external store like Kafka or a Database or a
> Distributed File system ? I wonder if I can represent the state S using a
> DataSet that represents the history of events? GroupState also has
> .exists() and  .get() and if I am not wrong I should override these methods
> right so comparisons and retrieval from external store can work?
>
> Thanks!
>
>
>
> On Wed, Aug 30, 2017 at 1:39 AM, kant kodali  wrote:
>
>> Hi TD,
>>
>> Thanks for the explanation and for the clear pseudo code and an example!
>>
>> mapGroupsWithState is cool and looks very flexible however I have few
>> concerns and questions. For example
>>
>> Say I store TrainHistory as max heap from the Java Collections library
>> and I keep adding to to this heap for 24 hours and at some point I will run
>> out of Java heap space right? Do I need to store TrainHistory as a
>> DataSet or DataFrame instead of in memory max heap object from Java
>> Collections library?
>>
>> I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState*
>> which approach is more efficient to solve this particular problem ?
>>
>> Thanks!
>>
>>
>>
>>
>>
>> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Aah, I might have misinterpreted. The groupBy + window solution would
>>> give the max time for each train over 24 hours (non-overlapping window) of
>>> event data (timestamped by activity_timestamp). So the output would be
>>> like.
>>>
>>> Train Dest   Window(activity_timestamp)max(Time)
>>> 1 HK Aug28-00:00 to Aug29-00:0010:00<- updating
>>> currently through aug29
>>> 1 HKAug27-00:00 to Aug28-00:00 09:00<- not updating
>>> as no new updates coming in with activity_timestamp in this range.
>>>
>>> The drawback of this approach is that as soon as Aug28 starts, you have
>>> wait for new event about a train to get a new max(time). You may rather
>>> want a rolling 24 hour period, that is, the max time known over events in
>>> the last 24 hours.
>>> Then maintaining our own custom state using mapGroupsWithState/
>>> flatMapGroupsWithState() is the best and most flexible option.
>>> It is available in Spark 2.2 in Scala, Java.
>>>
>>> Here is an example that tracks 

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread kant kodali
I think I understand *groupByKey/**mapGroupsWithState *and I am still
trying to wrap my head around *GroupState*. so, I believe I have a naive
questions to ask on *GroupState*.

If I were to represent a state that has history of events (say 24 hours)
and say the number of events can be big for a given 24 hour period. where
do I store the state S? An external store like Kafka or a Database or a
Distributed File system ? I wonder if I can represent the state S using a
DataSet that represents the history of events? GroupState also has
.exists() and  .get() and if I am not wrong I should override these methods
right so comparisons and retrieval from external store can work?

Thanks!



On Wed, Aug 30, 2017 at 1:39 AM, kant kodali  wrote:

> Hi TD,
>
> Thanks for the explanation and for the clear pseudo code and an example!
>
> mapGroupsWithState is cool and looks very flexible however I have few
> concerns and questions. For example
>
> Say I store TrainHistory as max heap from the Java Collections library and
> I keep adding to to this heap for 24 hours and at some point I will run out
> of Java heap space right? Do I need to store TrainHistory as a DataSet or
> DataFrame instead of in memory max heap object from Java Collections
> library?
>
> I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState*
> which approach is more efficient to solve this particular problem ?
>
> Thanks!
>
>
>
>
>
> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Aah, I might have misinterpreted. The groupBy + window solution would
>> give the max time for each train over 24 hours (non-overlapping window) of
>> event data (timestamped by activity_timestamp). So the output would be
>> like.
>>
>> Train Dest   Window(activity_timestamp)max(Time)
>> 1 HK Aug28-00:00 to Aug29-00:0010:00<- updating
>> currently through aug29
>> 1 HKAug27-00:00 to Aug28-00:00 09:00<- not updating
>> as no new updates coming in with activity_timestamp in this range.
>>
>> The drawback of this approach is that as soon as Aug28 starts, you have
>> wait for new event about a train to get a new max(time). You may rather
>> want a rolling 24 hour period, that is, the max time known over events in
>> the last 24 hours.
>> Then maintaining our own custom state using mapGroupsWithState/
>> flatMapGroupsWithState() is the best and most flexible option.
>> It is available in Spark 2.2 in Scala, Java.
>>
>> Here is an example that tracks sessions based on events.
>> Scala - https://github.com/apache/spark/blob/master/examples/src/
>> main/scala/org/apache/spark/examples/sql/streaming/Structu
>> redSessionization.scala
>>
>> You will have to create a custom per-train state which keeps track of
>> last 24 hours of trains history, and use that state to calculate the max
>> time for each train.
>>
>>
>> def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents],
>> state: GroupState[TrainHistory]): Long = {
>> // for every event, update history (i.e. last 24 hours of events) and
>> return the max time from the history
>> }
>>
>> trainTimesDataset // Dataset[TrainEvents]
>>   .groupByKey(_.train)
>>   .mapGroupsWithState(updateHistoryAndGetMax)
>>
>> Hope this helps.
>>
>>
>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>>
>>> Hey TD,
>>>
>>> If I understood the question correctly, your solution wouldn't return
>>> the exact solution, since it also groups by on destination. I would say the
>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>> .groupByKey(_.train)
>>>
>>> and keep in state the row with the maximum time.
>>>
>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Yes. And in that case, if you just care about only the last few days of
 max, then you should set watermark on the timestamp column.

  *trainTimesDataset*
 *  .withWatermark("**activity_timestamp", "5 days")*
 *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
 "train", "dest")*
 *  .max("time")*

 Any counts which are more than 5 days old will be dropped from the
 streaming state.

 On Tue, Aug 29, 2017 at 2:06 PM, kant kodali 
 wrote:

> Hi,
>
> Thanks for the response. Since this is a streaming based query and in
> my case I need to hold state for 24 hours which I forgot to mention in my
> previous email. can I do ?
>
>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
> "24 hours"), "train", "dest").max("time")*
>
>
> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>> Int, dest: String, time: Timestamp] *
>>
>>
>> *Scala*: *trainTimesDataset.groupBy("train", 

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread kant kodali
Hi TD,

Thanks for the explanation and for the clear pseudo code and an example!

mapGroupsWithState is cool and looks very flexible however I have few
concerns and questions. For example

Say I store TrainHistory as max heap from the Java Collections library and
I keep adding to to this heap for 24 hours and at some point I will run out
of Java heap space right? Do I need to store TrainHistory as a DataSet or
DataFrame instead of in memory max heap object from Java Collections library
?

I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState* which
approach is more efficient to solve this particular problem ?

Thanks!





On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das 
wrote:

> Aah, I might have misinterpreted. The groupBy + window solution would give
> the max time for each train over 24 hours (non-overlapping window) of event
> data (timestamped by activity_timestamp). So the output would be like.
>
> Train Dest   Window(activity_timestamp)max(Time)
> 1 HK Aug28-00:00 to Aug29-00:0010:00<- updating
> currently through aug29
> 1 HKAug27-00:00 to Aug28-00:00 09:00<- not updating as
> no new updates coming in with activity_timestamp in this range.
>
> The drawback of this approach is that as soon as Aug28 starts, you have
> wait for new event about a train to get a new max(time). You may rather
> want a rolling 24 hour period, that is, the max time known over events in
> the last 24 hours.
> Then maintaining our own custom state using 
> mapGroupsWithState/flatMapGroupsWithState()
> is the best and most flexible option.
> It is available in Spark 2.2 in Scala, Java.
>
> Here is an example that tracks sessions based on events.
> Scala - https://github.com/apache/spark/blob/master/examples/
> src/main/scala/org/apache/spark/examples/sql/streaming/
> StructuredSessionization.scala
>
> You will have to create a custom per-train state which keeps track of last
> 24 hours of trains history, and use that state to calculate the max time
> for each train.
>
>
> def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents],
> state: GroupState[TrainHistory]): Long = {
> // for every event, update history (i.e. last 24 hours of events) and
> return the max time from the history
> }
>
> trainTimesDataset // Dataset[TrainEvents]
>   .groupByKey(_.train)
>   .mapGroupsWithState(updateHistoryAndGetMax)
>
> Hope this helps.
>
>
> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>
>> Hey TD,
>>
>> If I understood the question correctly, your solution wouldn't return the
>> exact solution, since it also groups by on destination. I would say the
>> easiest solution would be to use flatMapGroupsWithState, where you:
>> .groupByKey(_.train)
>>
>> and keep in state the row with the maximum time.
>>
>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Yes. And in that case, if you just care about only the last few days of
>>> max, then you should set watermark on the timestamp column.
>>>
>>>  *trainTimesDataset*
>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>>> "dest")*
>>> *  .max("time")*
>>>
>>> Any counts which are more than 5 days old will be dropped from the
>>> streaming state.
>>>
>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>>>
 Hi,

 Thanks for the response. Since this is a streaming based query and in
 my case I need to hold state for 24 hours which I forgot to mention in my
 previous email. can I do ?

  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
 hours"), "train", "dest").max("time")*


 On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
> Int, dest: String, time: Timestamp] *
>
>
> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>
>
> *SQL*: *"select train, dest, max(time) from trainTimesView group by
> train, dest"*// after calling
> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>
>
> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I am wondering what is the easiest and concise way to express the
>> computation below in Spark Structured streaming given that it supports 
>> both
>> imperative and declarative styles?
>> I am just trying to select rows that has max timestamp for each
>> train? Instead of doing some sort of nested queries like we normally do 
>> in
>> any relational database I am trying to see if I can leverage both
>> imperative and declarative at the same time. If nested queries or join 
>> are
>> not required then I would like to see 

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Aah, I might have misinterpreted. The groupBy + window solution would give
the max time for each train over 24 hours (non-overlapping window) of event
data (timestamped by activity_timestamp). So the output would be like.

Train Dest   Window(activity_timestamp)max(Time)
1 HK Aug28-00:00 to Aug29-00:0010:00<- updating
currently through aug29
1 HKAug27-00:00 to Aug28-00:00 09:00<- not updating as
no new updates coming in with activity_timestamp in this range.

The drawback of this approach is that as soon as Aug28 starts, you have
wait for new event about a train to get a new max(time). You may rather
want a rolling 24 hour period, that is, the max time known over events in
the last 24 hours.
Then maintaining our own custom state using
mapGroupsWithState/flatMapGroupsWithState()
is the best and most flexible option.
It is available in Spark 2.2 in Scala, Java.

Here is an example that tracks sessions based on events.
Scala -
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

You will have to create a custom per-train state which keeps track of last
24 hours of trains history, and use that state to calculate the max time
for each train.


def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents],
state: GroupState[TrainHistory]): Long = {
// for every event, update history (i.e. last 24 hours of events) and
return the max time from the history
}

trainTimesDataset // Dataset[TrainEvents]
  .groupByKey(_.train)
  .mapGroupsWithState(updateHistoryAndGetMax)

Hope this helps.


On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:

> Hey TD,
>
> If I understood the question correctly, your solution wouldn't return the
> exact solution, since it also groups by on destination. I would say the
> easiest solution would be to use flatMapGroupsWithState, where you:
> .groupByKey(_.train)
>
> and keep in state the row with the maximum time.
>
> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Yes. And in that case, if you just care about only the last few days of
>> max, then you should set watermark on the timestamp column.
>>
>>  *trainTimesDataset*
>> *  .withWatermark("**activity_timestamp", "5 days")*
>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>> "dest")*
>> *  .max("time")*
>>
>> Any counts which are more than 5 days old will be dropped from the
>> streaming state.
>>
>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>>
>>> Hi,
>>>
>>> Thanks for the response. Since this is a streaming based query and in my
>>> case I need to hold state for 24 hours which I forgot to mention in my
>>> previous email. can I do ?
>>>
>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>>> hours"), "train", "dest").max("time")*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
 Int, dest: String, time: Timestamp] *


 *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*


 *SQL*: *"select train, dest, max(time) from trainTimesView group by
 train, dest"*// after calling
 *trainTimesData.createOrReplaceTempView(trainTimesView)*


 On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
 wrote:

> Hi All,
>
> I am wondering what is the easiest and concise way to express the
> computation below in Spark Structured streaming given that it supports 
> both
> imperative and declarative styles?
> I am just trying to select rows that has max timestamp for each train?
> Instead of doing some sort of nested queries like we normally do in any
> relational database I am trying to see if I can leverage both imperative
> and declarative at the same time. If nested queries or join are not
> required then I would like to see how this can be possible? I am using
> spark 2.1.1.
>
> Dataset
>
> TrainDest  Time1HK10:001SH12:001  
>   SZ14:002HK13:002SH09:002
> SZ07:00
>
> The desired result should be:
>
> TrainDest  Time1SZ14:002HK13:00
>
>

>>>
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Can I do sub queries using DataSet or DataFrame API's but not raw sql
unless it is a final resort?

On Tue, Aug 29, 2017 at 8:47 PM, ayan guha  wrote:

> This is not correct. In SQL Land, your query should be like below:
>
> select * from (
> select Train,DEST,TIME, row_number() over (partition by train order by
> time desc) r
> from TrainTable
> ) x where r=1
>
> All the constructs supported in dataframe functions.
>
> On Wed, Aug 30, 2017 at 1:08 PM, kant kodali  wrote:
>
>> yes in a relational db one could just do this
>>
>> SELECT t.Train, t.Dest, r.MaxTimeFROM (
>>   SELECT Train, MAX(Time) as MaxTime
>>   FROM TrainTable
>>   GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND 
>> t.Time = r.MaxTime
>>
>> (copied answer from a Stack overflow question someone asked)
>>
>> but still thinking how to do this in a streaming setting?
>> mapGroupWithState looks interesting but its just not available in 2.1.1 If
>> I am not wrong.
>>
>> Thanks!
>>
>> On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz  wrote:
>>
>>> That just gives you the max time for each train. If I understood the
>>> question correctly, OP wants the whole row with the max time. That's
>>> generally solved through joins or subqueries, which would be hard to do in
>>> a streaming setting
>>>
>>> On Aug 29, 2017 7:29 PM, "ayan guha"  wrote:
>>>
 Why removing the destination from the window wont work? Like this:

  *trainTimesDataset*
 *  .withWatermark("**activity_timestamp", "5 days")*
 *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
 "train")*
 *  .max("time")*

 On Wed, Aug 30, 2017 at 10:38 AM, kant kodali 
 wrote:

> @Burak so how would the transformation or query would look like for
> the above example? I don't see flatMapGroupsWithState in the DataSet
> API Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life
> easier.
>
>
>
> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>
>> Hey TD,
>>
>> If I understood the question correctly, your solution wouldn't return
>> the exact solution, since it also groups by on destination. I would say 
>> the
>> easiest solution would be to use flatMapGroupsWithState, where you:
>> .groupByKey(_.train)
>>
>> and keep in state the row with the maximum time.
>>
>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Yes. And in that case, if you just care about only the last few days
>>> of max, then you should set watermark on the timestamp column.
>>>
>>>  *trainTimesDataset*
>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>>> "train", "dest")*
>>> *  .max("time")*
>>>
>>> Any counts which are more than 5 days old will be dropped from the
>>> streaming state.
>>>
>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali 
>>> wrote:
>>>
 Hi,

 Thanks for the response. Since this is a streaming based query and
 in my case I need to hold state for 24 hours which I forgot to mention 
 in
 my previous email. can I do ?

  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
 "24 hours"), "train", "dest").max("time")*


 On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
> Int, dest: String, time: Timestamp] *
>
>
> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>
>
> *SQL*: *"select train, dest, max(time) from trainTimesView group
> by train, dest"*// after calling
> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>
>
> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I am wondering what is the easiest and concise way to express the
>> computation below in Spark Structured streaming given that it 
>> supports both
>> imperative and declarative styles?
>> I am just trying to select rows that has max timestamp for each
>> train? Instead of doing some sort of nested queries like we normally 
>> do in
>> any relational database I am trying to see if I can leverage both
>> imperative and declarative at the same time. If nested queries or 
>> join are
>> not required then I would like to see how this can be possible? I am 
>> using
>> spark 2.1.1.
>>
>> Dataset
>>

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread ayan guha
This is not correct. In SQL Land, your query should be like below:

select * from (
select Train,DEST,TIME, row_number() over (partition by train order by time
desc) r
from TrainTable
) x where r=1

All the constructs supported in dataframe functions.

On Wed, Aug 30, 2017 at 1:08 PM, kant kodali  wrote:

> yes in a relational db one could just do this
>
> SELECT t.Train, t.Dest, r.MaxTimeFROM (
>   SELECT Train, MAX(Time) as MaxTime
>   FROM TrainTable
>   GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND t.Time 
> = r.MaxTime
>
> (copied answer from a Stack overflow question someone asked)
>
> but still thinking how to do this in a streaming setting?
> mapGroupWithState looks interesting but its just not available in 2.1.1 If
> I am not wrong.
>
> Thanks!
>
> On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz  wrote:
>
>> That just gives you the max time for each train. If I understood the
>> question correctly, OP wants the whole row with the max time. That's
>> generally solved through joins or subqueries, which would be hard to do in
>> a streaming setting
>>
>> On Aug 29, 2017 7:29 PM, "ayan guha"  wrote:
>>
>>> Why removing the destination from the window wont work? Like this:
>>>
>>>  *trainTimesDataset*
>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
>>> *  .max("time")*
>>>
>>> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali 
>>> wrote:
>>>
 @Burak so how would the transformation or query would look like for the
 above example? I don't see flatMapGroupsWithState in the DataSet API
 Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.



 On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:

> Hey TD,
>
> If I understood the question correctly, your solution wouldn't return
> the exact solution, since it also groups by on destination. I would say 
> the
> easiest solution would be to use flatMapGroupsWithState, where you:
> .groupByKey(_.train)
>
> and keep in state the row with the maximum time.
>
> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Yes. And in that case, if you just care about only the last few days
>> of max, then you should set watermark on the timestamp column.
>>
>>  *trainTimesDataset*
>> *  .withWatermark("**activity_timestamp", "5 days")*
>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>> "train", "dest")*
>> *  .max("time")*
>>
>> Any counts which are more than 5 days old will be dropped from the
>> streaming state.
>>
>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali 
>> wrote:
>>
>>> Hi,
>>>
>>> Thanks for the response. Since this is a streaming based query and
>>> in my case I need to hold state for 24 hours which I forgot to mention 
>>> in
>>> my previous email. can I do ?
>>>
>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
>>> "24 hours"), "train", "dest").max("time")*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
 Int, dest: String, time: Timestamp] *


 *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*


 *SQL*: *"select train, dest, max(time) from trainTimesView group
 by train, dest"*// after calling
 *trainTimesData.createOrReplaceTempView(trainTimesView)*


 On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
 wrote:

> Hi All,
>
> I am wondering what is the easiest and concise way to express the
> computation below in Spark Structured streaming given that it 
> supports both
> imperative and declarative styles?
> I am just trying to select rows that has max timestamp for each
> train? Instead of doing some sort of nested queries like we normally 
> do in
> any relational database I am trying to see if I can leverage both
> imperative and declarative at the same time. If nested queries or 
> join are
> not required then I would like to see how this can be possible? I am 
> using
> spark 2.1.1.
>
> Dataset
>
> TrainDest  Time1HK10:001SH
> 12:001SZ14:002HK13:002SH  
>   09:002SZ07:00
>
> The desired result should be:
>
> TrainDest  Time1SZ14:002 

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
yes in a relational db one could just do this

SELECT t.Train, t.Dest, r.MaxTimeFROM (
  SELECT Train, MAX(Time) as MaxTime
  FROM TrainTable
  GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND
t.Time = r.MaxTime

(copied answer from a Stack overflow question someone asked)

but still thinking how to do this in a streaming setting? mapGroupWithState
looks interesting but its just not available in 2.1.1 If I am not wrong.

Thanks!

On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz  wrote:

> That just gives you the max time for each train. If I understood the
> question correctly, OP wants the whole row with the max time. That's
> generally solved through joins or subqueries, which would be hard to do in
> a streaming setting
>
> On Aug 29, 2017 7:29 PM, "ayan guha"  wrote:
>
>> Why removing the destination from the window wont work? Like this:
>>
>>  *trainTimesDataset*
>> *  .withWatermark("**activity_timestamp", "5 days")*
>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
>> *  .max("time")*
>>
>> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali  wrote:
>>
>>> @Burak so how would the transformation or query would look like for the
>>> above example? I don't see flatMapGroupsWithState in the DataSet API
>>> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.
>>>
>>>
>>>
>>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>>>
 Hey TD,

 If I understood the question correctly, your solution wouldn't return
 the exact solution, since it also groups by on destination. I would say the
 easiest solution would be to use flatMapGroupsWithState, where you:
 .groupByKey(_.train)

 and keep in state the row with the maximum time.

 On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Yes. And in that case, if you just care about only the last few days
> of max, then you should set watermark on the timestamp column.
>
>  *trainTimesDataset*
> *  .withWatermark("**activity_timestamp", "5 days")*
> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
> "train", "dest")*
> *  .max("time")*
>
> Any counts which are more than 5 days old will be dropped from the
> streaming state.
>
> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali 
> wrote:
>
>> Hi,
>>
>> Thanks for the response. Since this is a streaming based query and in
>> my case I need to hold state for 24 hours which I forgot to mention in my
>> previous email. can I do ?
>>
>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
>> "24 hours"), "train", "dest").max("time")*
>>
>>
>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>> Int, dest: String, time: Timestamp] *
>>>
>>>
>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>
>>>
>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>> train, dest"*// after calling
>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I am wondering what is the easiest and concise way to express the
 computation below in Spark Structured streaming given that it supports 
 both
 imperative and declarative styles?
 I am just trying to select rows that has max timestamp for each
 train? Instead of doing some sort of nested queries like we normally 
 do in
 any relational database I am trying to see if I can leverage both
 imperative and declarative at the same time. If nested queries or join 
 are
 not required then I would like to see how this can be possible? I am 
 using
 spark 2.1.1.

 Dataset

 TrainDest  Time1HK10:001SH
 12:001SZ14:002HK13:002SH   
  09:002SZ07:00

 The desired result should be:

 TrainDest  Time1SZ14:002HK13:00


>>>
>>
>

>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Burak Yavuz
That just gives you the max time for each train. If I understood the
question correctly, OP wants the whole row with the max time. That's
generally solved through joins or subqueries, which would be hard to do in
a streaming setting

On Aug 29, 2017 7:29 PM, "ayan guha"  wrote:

> Why removing the destination from the window wont work? Like this:
>
>  *trainTimesDataset*
> *  .withWatermark("**activity_timestamp", "5 days")*
> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
> *  .max("time")*
>
> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali  wrote:
>
>> @Burak so how would the transformation or query would look like for the
>> above example? I don't see flatMapGroupsWithState in the DataSet API
>> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.
>>
>>
>>
>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>>
>>> Hey TD,
>>>
>>> If I understood the question correctly, your solution wouldn't return
>>> the exact solution, since it also groups by on destination. I would say the
>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>> .groupByKey(_.train)
>>>
>>> and keep in state the row with the maximum time.
>>>
>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Yes. And in that case, if you just care about only the last few days of
 max, then you should set watermark on the timestamp column.

  *trainTimesDataset*
 *  .withWatermark("**activity_timestamp", "5 days")*
 *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
 "train", "dest")*
 *  .max("time")*

 Any counts which are more than 5 days old will be dropped from the
 streaming state.

 On Tue, Aug 29, 2017 at 2:06 PM, kant kodali 
 wrote:

> Hi,
>
> Thanks for the response. Since this is a streaming based query and in
> my case I need to hold state for 24 hours which I forgot to mention in my
> previous email. can I do ?
>
>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
> "24 hours"), "train", "dest").max("time")*
>
>
> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>> Int, dest: String, time: Timestamp] *
>>
>>
>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>
>>
>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>> train, dest"*// after calling
>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>
>>
>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am wondering what is the easiest and concise way to express the
>>> computation below in Spark Structured streaming given that it supports 
>>> both
>>> imperative and declarative styles?
>>> I am just trying to select rows that has max timestamp for each
>>> train? Instead of doing some sort of nested queries like we normally do 
>>> in
>>> any relational database I am trying to see if I can leverage both
>>> imperative and declarative at the same time. If nested queries or join 
>>> are
>>> not required then I would like to see how this can be possible? I am 
>>> using
>>> spark 2.1.1.
>>>
>>> Dataset
>>>
>>> TrainDest  Time1HK10:001SH
>>> 12:001SZ14:002HK13:002SH
>>> 09:002SZ07:00
>>>
>>> The desired result should be:
>>>
>>> TrainDest  Time1SZ14:002HK13:00
>>>
>>>
>>
>

>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread ayan guha
Why removing the destination from the window wont work? Like this:

 *trainTimesDataset*
*  .withWatermark("**activity_timestamp", "5 days")*
*  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
*  .max("time")*

On Wed, Aug 30, 2017 at 10:38 AM, kant kodali  wrote:

> @Burak so how would the transformation or query would look like for the
> above example? I don't see flatMapGroupsWithState in the DataSet API
> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.
>
>
>
> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>
>> Hey TD,
>>
>> If I understood the question correctly, your solution wouldn't return the
>> exact solution, since it also groups by on destination. I would say the
>> easiest solution would be to use flatMapGroupsWithState, where you:
>> .groupByKey(_.train)
>>
>> and keep in state the row with the maximum time.
>>
>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Yes. And in that case, if you just care about only the last few days of
>>> max, then you should set watermark on the timestamp column.
>>>
>>>  *trainTimesDataset*
>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>>> "dest")*
>>> *  .max("time")*
>>>
>>> Any counts which are more than 5 days old will be dropped from the
>>> streaming state.
>>>
>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>>>
 Hi,

 Thanks for the response. Since this is a streaming based query and in
 my case I need to hold state for 24 hours which I forgot to mention in my
 previous email. can I do ?

  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
 hours"), "train", "dest").max("time")*


 On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
> Int, dest: String, time: Timestamp] *
>
>
> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>
>
> *SQL*: *"select train, dest, max(time) from trainTimesView group by
> train, dest"*// after calling
> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>
>
> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I am wondering what is the easiest and concise way to express the
>> computation below in Spark Structured streaming given that it supports 
>> both
>> imperative and declarative styles?
>> I am just trying to select rows that has max timestamp for each
>> train? Instead of doing some sort of nested queries like we normally do 
>> in
>> any relational database I am trying to see if I can leverage both
>> imperative and declarative at the same time. If nested queries or join 
>> are
>> not required then I would like to see how this can be possible? I am 
>> using
>> spark 2.1.1.
>>
>> Dataset
>>
>> TrainDest  Time1HK10:001SH12:001 
>>SZ14:002HK13:002SH09:002  
>>   SZ07:00
>>
>> The desired result should be:
>>
>> TrainDest  Time1SZ14:002HK13:00
>>
>>
>

>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
@Burak so how would the transformation or query would look like for the
above example? I don't see flatMapGroupsWithState in the DataSet API Spark
2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.



On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:

> Hey TD,
>
> If I understood the question correctly, your solution wouldn't return the
> exact solution, since it also groups by on destination. I would say the
> easiest solution would be to use flatMapGroupsWithState, where you:
> .groupByKey(_.train)
>
> and keep in state the row with the maximum time.
>
> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Yes. And in that case, if you just care about only the last few days of
>> max, then you should set watermark on the timestamp column.
>>
>>  *trainTimesDataset*
>> *  .withWatermark("**activity_timestamp", "5 days")*
>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>> "dest")*
>> *  .max("time")*
>>
>> Any counts which are more than 5 days old will be dropped from the
>> streaming state.
>>
>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>>
>>> Hi,
>>>
>>> Thanks for the response. Since this is a streaming based query and in my
>>> case I need to hold state for 24 hours which I forgot to mention in my
>>> previous email. can I do ?
>>>
>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>>> hours"), "train", "dest").max("time")*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
 Int, dest: String, time: Timestamp] *


 *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*


 *SQL*: *"select train, dest, max(time) from trainTimesView group by
 train, dest"*// after calling
 *trainTimesData.createOrReplaceTempView(trainTimesView)*


 On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
 wrote:

> Hi All,
>
> I am wondering what is the easiest and concise way to express the
> computation below in Spark Structured streaming given that it supports 
> both
> imperative and declarative styles?
> I am just trying to select rows that has max timestamp for each train?
> Instead of doing some sort of nested queries like we normally do in any
> relational database I am trying to see if I can leverage both imperative
> and declarative at the same time. If nested queries or join are not
> required then I would like to see how this can be possible? I am using
> spark 2.1.1.
>
> Dataset
>
> TrainDest  Time1HK10:001SH12:001  
>   SZ14:002HK13:002SH09:002
> SZ07:00
>
> The desired result should be:
>
> TrainDest  Time1SZ14:002HK13:00
>
>

>>>
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Burak Yavuz
Hey TD,

If I understood the question correctly, your solution wouldn't return the
exact solution, since it also groups by on destination. I would say the
easiest solution would be to use flatMapGroupsWithState, where you:
.groupByKey(_.train)

and keep in state the row with the maximum time.

On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das 
wrote:

> Yes. And in that case, if you just care about only the last few days of
> max, then you should set watermark on the timestamp column.
>
>  *trainTimesDataset*
> *  .withWatermark("**activity_timestamp", "5 days")*
> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
> "dest")*
> *  .max("time")*
>
> Any counts which are more than 5 days old will be dropped from the
> streaming state.
>
> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>
>> Hi,
>>
>> Thanks for the response. Since this is a streaming based query and in my
>> case I need to hold state for 24 hours which I forgot to mention in my
>> previous email. can I do ?
>>
>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>> hours"), "train", "dest").max("time")*
>>
>>
>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>> Int, dest: String, time: Timestamp] *
>>>
>>>
>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>
>>>
>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>> train, dest"*// after calling
>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I am wondering what is the easiest and concise way to express the
 computation below in Spark Structured streaming given that it supports both
 imperative and declarative styles?
 I am just trying to select rows that has max timestamp for each train?
 Instead of doing some sort of nested queries like we normally do in any
 relational database I am trying to see if I can leverage both imperative
 and declarative at the same time. If nested queries or join are not
 required then I would like to see how this can be possible? I am using
 spark 2.1.1.

 Dataset

 TrainDest  Time1HK10:001SH12:001   
  SZ14:002HK13:002SH09:002  
   SZ07:00

 The desired result should be:

 TrainDest  Time1SZ14:002HK13:00


>>>
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Yes. And in that case, if you just care about only the last few days of
max, then you should set watermark on the timestamp column.

 *trainTimesDataset*
*  .withWatermark("**activity_timestamp", "5 days")*
*  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
"dest")*
*  .max("time")*

Any counts which are more than 5 days old will be dropped from the
streaming state.

On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:

> Hi,
>
> Thanks for the response. Since this is a streaming based query and in my
> case I need to hold state for 24 hours which I forgot to mention in my
> previous email. can I do ?
>
>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
> hours"), "train", "dest").max("time")*
>
>
> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>> Int, dest: String, time: Timestamp] *
>>
>>
>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>
>>
>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>> train, dest"*// after calling
>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>
>>
>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I am wondering what is the easiest and concise way to express the
>>> computation below in Spark Structured streaming given that it supports both
>>> imperative and declarative styles?
>>> I am just trying to select rows that has max timestamp for each train?
>>> Instead of doing some sort of nested queries like we normally do in any
>>> relational database I am trying to see if I can leverage both imperative
>>> and declarative at the same time. If nested queries or join are not
>>> required then I would like to see how this can be possible? I am using
>>> spark 2.1.1.
>>>
>>> Dataset
>>>
>>> TrainDest  Time1HK10:001SH12:001
>>> SZ14:002HK13:002SH09:002
>>> SZ07:00
>>>
>>> The desired result should be:
>>>
>>> TrainDest  Time1SZ14:002HK13:00
>>>
>>>
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Hi,

Thanks for the response. Since this is a streaming based query and in my
case I need to hold state for 24 hours which I forgot to mention in my
previous email. can I do ?

 *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
hours"), "train", "dest").max("time")*


On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das 
wrote:

> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: Int,
> dest: String, time: Timestamp] *
>
>
> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>
>
> *SQL*: *"select train, dest, max(time) from trainTimesView group by
> train, dest"*// after calling
> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>
>
> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> I am wondering what is the easiest and concise way to express the
>> computation below in Spark Structured streaming given that it supports both
>> imperative and declarative styles?
>> I am just trying to select rows that has max timestamp for each train?
>> Instead of doing some sort of nested queries like we normally do in any
>> relational database I am trying to see if I can leverage both imperative
>> and declarative at the same time. If nested queries or join are not
>> required then I would like to see how this can be possible? I am using
>> spark 2.1.1.
>>
>> Dataset
>>
>> TrainDest  Time1HK10:001SH12:001 
>>SZ14:002HK13:002SH09:002
>> SZ07:00
>>
>> The desired result should be:
>>
>> TrainDest  Time1SZ14:002HK13:00
>>
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Say, *trainTimesDataset* is the streaming Dataset of schema *[train: Int,
dest: String, time: Timestamp] *


*Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*


*SQL*: *"select train, dest, max(time) from trainTimesView group by train,
dest"*// after calling
*trainTimesData.createOrReplaceTempView(trainTimesView)*


On Tue, Aug 29, 2017 at 12:59 PM, kant kodali  wrote:

> Hi All,
>
> I am wondering what is the easiest and concise way to express the
> computation below in Spark Structured streaming given that it supports both
> imperative and declarative styles?
> I am just trying to select rows that has max timestamp for each train?
> Instead of doing some sort of nested queries like we normally do in any
> relational database I am trying to see if I can leverage both imperative
> and declarative at the same time. If nested queries or join are not
> required then I would like to see how this can be possible? I am using
> spark 2.1.1.
>
> Dataset
>
> TrainDest  Time1HK10:001SH12:001  
>   SZ14:002HK13:002SH09:002SZ  
>   07:00
>
> The desired result should be:
>
> TrainDest  Time1SZ14:002HK13:00
>
>


How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Hi All,

I am wondering what is the easiest and concise way to express the
computation below in Spark Structured streaming given that it supports both
imperative and declarative styles?
I am just trying to select rows that has max timestamp for each train?
Instead of doing some sort of nested queries like we normally do in any
relational database I am trying to see if I can leverage both imperative
and declarative at the same time. If nested queries or join are not
required then I would like to see how this can be possible? I am using
spark 2.1.1.

Dataset

TrainDest  Time1HK10:001SH
12:001SZ14:002HK13:002SH
 09:002SZ07:00

The desired result should be:

TrainDest  Time1SZ14:002HK13:00