Re: Spark | Window Function |

2017-07-19 Thread Julien CHAMP
Hi and thanks a lot for your example !

Ok i've found my problem..
There was too much data ( 1000 ids / 1000 timestamps ) for my test, and it
does not seems to work in such cases :/
This does not seems to scale linearly with the number of id. With a small
example, with 1000 timestamps per id :
- and 50   ids : around 1min to compute
- and 100 ids : around 4min to compute

I've tried to repartition my data without any success :(
As my use cases can grow to a really large number of id / timestamps this
is problematic for me.

Regards
Julien

Le mer. 19 juil. 2017 à 00:22, Radhwane Chebaane 
a écrit :

> Hi Julien,
>
> Could you give more details about the problems you faced?
> Here is a working example with Spark dataframe and Spark SQL:
> https://gist.github.com/radcheb/d16042d8bb3815d3dd42030ecedc43cf
>
>
> Cheers,
> Radhwane Chebaane
>
>
> 2017-07-18 18:21 GMT+02:00 Julien CHAMP :
>
>> Hi Radhwane !
>>
>> I've tested both your solutions using dataframe or spark sql... and in
>> both cases spark is stucked :/
>> Did you test the code that you gave me ? I don't know if I've done
>> something wrong...
>>
>> Regards,
>> Julien
>>
>> Le lun. 10 juil. 2017 à 10:53, Radhwane Chebaane <
>> r.cheba...@mindlytix.com> a écrit :
>>
>>> Hi Julien,
>>>
>>>
>>> - Usually, windows functions require less shuffle than cross join so
>>> it's a little faster depending on use case. For large windows, cross join
>>> and window functions performances are close.
>>> - Use can use UDFs and UDAFs as in any Spark SQL request (Geometric
>>> Mean tested successfully).
>>>
>>> Regards,
>>> Radhwane
>>>
>>> 2017-07-06 16:22 GMT+02:00 Julien CHAMP :
>>>
 Thx a lot for your answer Radhwane :)


 I have some (many) use case with such needs of Long in window function.
 As said in the bug report, I can store events in ms in a dataframe, and
 want to count the number of events in past 10 years ( requiring a Long
 value )

 -> *Let's imagine that this window is used on timestamp values in ms :
 I can ask for a window with a range between [-216000L, 0] and only have
 a few values inside, not necessarily 216000L. I can understand the
 limitaion for the rowBetween() method but the rangeBetween() method is nice
 for this kind of usage.*


 The solution with self join seems nice, but 2 questions :

 - regarding performances, will it be as fast as window function ?

 - can I use my own aggregate function ( for example a Geometric Mean )
 with your solution ? ( using this :
 https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html ?



 Thanks again,

 Regards,


 Julien



 Le mer. 5 juil. 2017 à 19:18, Radhwane Chebaane <
 r.cheba...@mindlytix.com> a écrit :

> Hi Julien,
>
>
> Although this is a strange bug in Spark, it's rare to need more than
> Integer max value size for a window.
>
> Nevertheless, most of the window functions can be expressed with
> self-joins. Hence, your problem may be solved with this example:
>
> If input data as follow:
>
> +---+-+-+
> | id|timestamp|value|
> +---+-+-+
> |  B|1|  100|
> |  B|10010|   50|
> |  B|10020|  200|
> |  B|25000|  500|
> +---+-+-+
>
> And the window is  (-20L, 0)
>
> Then this code will give the wanted result:
>
> df.as("df1").join(df.as("df2"),
>   $"df2.timestamp" between($"df1.timestamp" - 20L, 
> $"df1.timestamp"))
>   .groupBy($"df1.id", $"df1.timestamp", $"df1.value")
>   .agg( functions.min($"df2.value").as("min___value"))
>   .orderBy($"df1.timestamp")
>   .show()
>
> +---+-+-+---+
> | id|timestamp|value|min___value|
> +---+-+-+---+
> |  B|1|  100|100|
> |  B|10010|   50| 50|
> |  B|10020|  200| 50|
> |  B|25000|  500|500|
> +---+-+-+---+
>
> Or by SparkSQL:
>
> SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as 
> min___value FROM
> (
>   SELECT a.id as id, a.timestamp as timestamp, a.value as value, 
> b.timestamp as _timestamp, b.value as _value
>   FROM df a CROSS JOIN df b
>   ON b.timestamp >= a.timestamp - 20L and b.timestamp <= 
> a.timestamp
> ) c
> GROUP BY c.id, c.timestamp, c.value ORDER BY c.timestamp
>
>
> This must be also possible also on Spark Streaming however don't expect 
> high performance.
>
>
> Cheers,
> Radhwane
>
>
>
> 2017-07-05 10:41 GMT+02:00 Julien CHAMP :
>
>> Hi there 

Re: Spark | Window Function |

2017-07-18 Thread Radhwane Chebaane
Hi Julien,

Could you give more details about the problems you faced?
Here is a working example with Spark dataframe and Spark SQL:
https://gist.github.com/radcheb/d16042d8bb3815d3dd42030ecedc43cf


Cheers,
Radhwane Chebaane


2017-07-18 18:21 GMT+02:00 Julien CHAMP :

> Hi Radhwane !
>
> I've tested both your solutions using dataframe or spark sql... and in
> both cases spark is stucked :/
> Did you test the code that you gave me ? I don't know if I've done
> something wrong...
>
> Regards,
> Julien
>
> Le lun. 10 juil. 2017 à 10:53, Radhwane Chebaane 
> a écrit :
>
>> Hi Julien,
>>
>>
>> - Usually, windows functions require less shuffle than cross join so it's
>> a little faster depending on use case. For large windows, cross join and
>> window functions performances are close.
>> - Use can use UDFs and UDAFs as in any Spark SQL request (Geometric Mean
>> tested successfully).
>>
>> Regards,
>> Radhwane
>>
>> 2017-07-06 16:22 GMT+02:00 Julien CHAMP :
>>
>>> Thx a lot for your answer Radhwane :)
>>>
>>>
>>> I have some (many) use case with such needs of Long in window function.
>>> As said in the bug report, I can store events in ms in a dataframe, and
>>> want to count the number of events in past 10 years ( requiring a Long
>>> value )
>>>
>>> -> *Let's imagine that this window is used on timestamp values in ms :
>>> I can ask for a window with a range between [-216000L, 0] and only have
>>> a few values inside, not necessarily 216000L. I can understand the
>>> limitaion for the rowBetween() method but the rangeBetween() method is nice
>>> for this kind of usage.*
>>>
>>>
>>> The solution with self join seems nice, but 2 questions :
>>>
>>> - regarding performances, will it be as fast as window function ?
>>>
>>> - can I use my own aggregate function ( for example a Geometric Mean )
>>> with your solution ? ( using this : https://docs.databricks.com/
>>> spark/latest/spark-sql/udaf-scala.html ?
>>>
>>>
>>>
>>> Thanks again,
>>>
>>> Regards,
>>>
>>>
>>> Julien
>>>
>>>
>>>
>>> Le mer. 5 juil. 2017 à 19:18, Radhwane Chebaane <
>>> r.cheba...@mindlytix.com> a écrit :
>>>
 Hi Julien,


 Although this is a strange bug in Spark, it's rare to need more than
 Integer max value size for a window.

 Nevertheless, most of the window functions can be expressed with
 self-joins. Hence, your problem may be solved with this example:

 If input data as follow:

 +---+-+-+
 | id|timestamp|value|
 +---+-+-+
 |  B|1|  100|
 |  B|10010|   50|
 |  B|10020|  200|
 |  B|25000|  500|
 +---+-+-+

 And the window is  (-20L, 0)

 Then this code will give the wanted result:

 df.as("df1").join(df.as("df2"),
   $"df2.timestamp" between($"df1.timestamp" - 20L, 
 $"df1.timestamp"))
   .groupBy($"df1.id", $"df1.timestamp", $"df1.value")
   .agg( functions.min($"df2.value").as("min___value"))
   .orderBy($"df1.timestamp")
   .show()

 +---+-+-+---+
 | id|timestamp|value|min___value|
 +---+-+-+---+
 |  B|1|  100|100|
 |  B|10010|   50| 50|
 |  B|10020|  200| 50|
 |  B|25000|  500|500|
 +---+-+-+---+

 Or by SparkSQL:

 SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as 
 min___value FROM
 (
   SELECT a.id as id, a.timestamp as timestamp, a.value as value, 
 b.timestamp as _timestamp, b.value as _value
   FROM df a CROSS JOIN df b
   ON b.timestamp >= a.timestamp - 20L and b.timestamp <= 
 a.timestamp
 ) c
 GROUP BY c.id, c.timestamp, c.value ORDER BY c.timestamp


 This must be also possible also on Spark Streaming however don't expect 
 high performance.


 Cheers,
 Radhwane



 2017-07-05 10:41 GMT+02:00 Julien CHAMP :

> Hi there !
>
> Let me explain my problem to see if you have a good solution to help
> me :)
>
> Let's imagine that I have all my data in a DB or a file, that I load
> in a dataframe DF with the following columns :
> *id | timestamp(ms) | value*
> A | 100 |  100
> A | 110 |  50
> B | 100 |  100
> B | 110 |  50
> B | 120 |  200
> B | 250 |  500
> C | 100 |  200
> C | 110 |  500
>
> The timestamp is a *long value*, so as to be able to express date in
> ms from -01-01 to today !
>
> I want to compute operations such as min, max, average on the *value
> column*, for a given window function, and grouped by id ( Bonus :  if
> possible for only some timestamps... )

Re: Spark | Window Function |

2017-07-06 Thread Julien CHAMP
Thx a lot for your answer Radhwane :)


I have some (many) use case with such needs of Long in window function. As
said in the bug report, I can store events in ms in a dataframe, and want
to count the number of events in past 10 years ( requiring a Long value )

-> *Let's imagine that this window is used on timestamp values in ms : I
can ask for a window with a range between [-216000L, 0] and only have a
few values inside, not necessarily 216000L. I can understand the
limitaion for the rowBetween() method but the rangeBetween() method is nice
for this kind of usage.*


The solution with self join seems nice, but 2 questions :

- regarding performances, will it be as fast as window function ?

- can I use my own aggregate function ( for example a Geometric Mean ) with
your solution ? ( using this :
https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html ?



Thanks again,

Regards,


Julien



Le mer. 5 juil. 2017 à 19:18, Radhwane Chebaane 
a écrit :

> Hi Julien,
>
>
> Although this is a strange bug in Spark, it's rare to need more than
> Integer max value size for a window.
>
> Nevertheless, most of the window functions can be expressed with
> self-joins. Hence, your problem may be solved with this example:
>
> If input data as follow:
>
> +---+-+-+
> | id|timestamp|value|
> +---+-+-+
> |  B|1|  100|
> |  B|10010|   50|
> |  B|10020|  200|
> |  B|25000|  500|
> +---+-+-+
>
> And the window is  (-20L, 0)
>
> Then this code will give the wanted result:
>
> df.as("df1").join(df.as("df2"),
>   $"df2.timestamp" between($"df1.timestamp" - 20L, $"df1.timestamp"))
>   .groupBy($"df1.id", $"df1.timestamp", $"df1.value")
>   .agg( functions.min($"df2.value").as("min___value"))
>   .orderBy($"df1.timestamp")
>   .show()
>
> +---+-+-+---+
> | id|timestamp|value|min___value|
> +---+-+-+---+
> |  B|1|  100|100|
> |  B|10010|   50| 50|
> |  B|10020|  200| 50|
> |  B|25000|  500|500|
> +---+-+-+---+
>
> Or by SparkSQL:
>
> SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as 
> min___value FROM
> (
>   SELECT a.id as id, a.timestamp as timestamp, a.value as value, b.timestamp 
> as _timestamp, b.value as _value
>   FROM df a CROSS JOIN df b
>   ON b.timestamp >= a.timestamp - 20L and b.timestamp <= a.timestamp
> ) c
> GROUP BY c.id, c.timestamp, c.value ORDER BY c.timestamp
>
>
> This must be also possible also on Spark Streaming however don't expect high 
> performance.
>
>
> Cheers,
> Radhwane
>
>
>
> 2017-07-05 10:41 GMT+02:00 Julien CHAMP :
>
>> Hi there !
>>
>> Let me explain my problem to see if you have a good solution to help me :)
>>
>> Let's imagine that I have all my data in a DB or a file, that I load in a
>> dataframe DF with the following columns :
>> *id | timestamp(ms) | value*
>> A | 100 |  100
>> A | 110 |  50
>> B | 100 |  100
>> B | 110 |  50
>> B | 120 |  200
>> B | 250 |  500
>> C | 100 |  200
>> C | 110 |  500
>>
>> The timestamp is a *long value*, so as to be able to express date in ms
>> from -01-01 to today !
>>
>> I want to compute operations such as min, max, average on the *value
>> column*, for a given window function, and grouped by id ( Bonus :  if
>> possible for only some timestamps... )
>>
>> For example if I have 3 tuples :
>>
>> id | timestamp(ms) | value
>> B | 100 |  100
>> B | 110 |  50
>> B | 120 |  200
>> B | 250 |  500
>>
>> I would like to be able to compute the min value for windows of time =
>> 20. This would result in such a DF :
>>
>> id | timestamp(ms) | value | min___value
>> B | 100 |  100 | 100
>> B | 110 |  50  | 50
>> B | 120 |  200 | 50
>> B | 250 |  500 | 500
>>
>> This seems the perfect use case for window function in spark  ( cf :
>> https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
>>  )
>> I can use :
>>
>> Window.orderBy("timestamp").partitionBy("id").rangeBetween(-20,0)
>> df.withColumn("min___value", min(df.col("value")).over(tw))
>>
>> This leads to the perfect answer !
>>
>> However, there is a big bug with window functions as reported here (
>> https://issues.apache.org/jira/browse/SPARK-19451 ) when working with
>> Long values !!! So I can't use this
>>
>> So my question is ( of course ) how can I resolve my problem ?
>> If I use spark streaming I will face the same issue ?
>>
>> I'll be glad to discuss this problem with you, feel free to answer :)
>>
>> Regards,
>>
>> Julien
>> --
>>
>>
>> Julien CHAMP — Data Scientist
>>
>>
>> *Web : **www.tellmeplus.com*  — *Email : 
>> **jch...@tellmeplus.com
>> *
>>
>> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* : 

Re: Spark | Window Function |

2017-07-05 Thread Radhwane Chebaane
Hi Julien,


Although this is a strange bug in Spark, it's rare to need more than
Integer max value size for a window.

Nevertheless, most of the window functions can be expressed with
self-joins. Hence, your problem may be solved with this example:

If input data as follow:

+---+-+-+
| id|timestamp|value|
+---+-+-+
|  B|1|  100|
|  B|10010|   50|
|  B|10020|  200|
|  B|25000|  500|
+---+-+-+

And the window is  (-20L, 0)

Then this code will give the wanted result:

df.as("df1").join(df.as("df2"),
  $"df2.timestamp" between($"df1.timestamp" - 20L, $"df1.timestamp"))
  .groupBy($"df1.id", $"df1.timestamp", $"df1.value")
  .agg( functions.min($"df2.value").as("min___value"))
  .orderBy($"df1.timestamp")
  .show()

+---+-+-+---+
| id|timestamp|value|min___value|
+---+-+-+---+
|  B|1|  100|100|
|  B|10010|   50| 50|
|  B|10020|  200| 50|
|  B|25000|  500|500|
+---+-+-+---+

Or by SparkSQL:

SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as
min___value FROM
(
  SELECT a.id as id, a.timestamp as timestamp, a.value as value,
b.timestamp as _timestamp, b.value as _value
  FROM df a CROSS JOIN df b
  ON b.timestamp >= a.timestamp - 20L and b.timestamp <= a.timestamp
) c
GROUP BY c.id, c.timestamp, c.value ORDER BY c.timestamp


This must be also possible also on Spark Streaming however don't
expect high performance.


Cheers,
Radhwane



2017-07-05 10:41 GMT+02:00 Julien CHAMP :

> Hi there !
>
> Let me explain my problem to see if you have a good solution to help me :)
>
> Let's imagine that I have all my data in a DB or a file, that I load in a
> dataframe DF with the following columns :
> *id | timestamp(ms) | value*
> A | 100 |  100
> A | 110 |  50
> B | 100 |  100
> B | 110 |  50
> B | 120 |  200
> B | 250 |  500
> C | 100 |  200
> C | 110 |  500
>
> The timestamp is a *long value*, so as to be able to express date in ms
> from -01-01 to today !
>
> I want to compute operations such as min, max, average on the *value
> column*, for a given window function, and grouped by id ( Bonus :  if
> possible for only some timestamps... )
>
> For example if I have 3 tuples :
>
> id | timestamp(ms) | value
> B | 100 |  100
> B | 110 |  50
> B | 120 |  200
> B | 250 |  500
>
> I would like to be able to compute the min value for windows of time = 20.
> This would result in such a DF :
>
> id | timestamp(ms) | value | min___value
> B | 100 |  100 | 100
> B | 110 |  50  | 50
> B | 120 |  200 | 50
> B | 250 |  500 | 500
>
> This seems the perfect use case for window function in spark  ( cf :
> https://databricks.com/blog/2015/07/15/introducing-window-
> functions-in-spark-sql.html )
> I can use :
>
> Window.orderBy("timestamp").partitionBy("id").rangeBetween(-20,0)
> df.withColumn("min___value", min(df.col("value")).over(tw))
>
> This leads to the perfect answer !
>
> However, there is a big bug with window functions as reported here (
> https://issues.apache.org/jira/browse/SPARK-19451 ) when working with
> Long values !!! So I can't use this
>
> So my question is ( of course ) how can I resolve my problem ?
> If I use spark streaming I will face the same issue ?
>
> I'll be glad to discuss this problem with you, feel free to answer :)
>
> Regards,
>
> Julien
> --
>
>
> Julien CHAMP — Data Scientist
>
>
> *Web : **www.tellmeplus.com*  — *Email : 
> **jch...@tellmeplus.com
> *
>
> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
> 
>
> TellMePlus S.A — Predictive Objects
>
> *Paris* : 7 rue des Pommerots, 78400 Chatou
> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière
>
>
> Ce message peut contenir des informations confidentielles ou couvertes par
> le secret professionnel, à l’intention de son destinataire. Si vous n’en
> êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer
> toute copie.
> This email may contain confidential and/or privileged information for the
> intended recipient. If you are not the intended recipient, please contact
> the sender and delete all copies.
>
>
> 




-- 

[image: photo] Radhwane Chebaane
Distributed systems engineer, Mindlytix

Mail: radhw...@mindlytix.com  
Mobile: +33 695 588 906 <+33+695+588+906>

Skype: rad.cheb  
LinkedIn 



Spark | Window Function |

2017-07-05 Thread Julien CHAMP
Hi there !

Let me explain my problem to see if you have a good solution to help me :)

Let's imagine that I have all my data in a DB or a file, that I load in a
dataframe DF with the following columns :
*id | timestamp(ms) | value*
A | 100 |  100
A | 110 |  50
B | 100 |  100
B | 110 |  50
B | 120 |  200
B | 250 |  500
C | 100 |  200
C | 110 |  500

The timestamp is a *long value*, so as to be able to express date in ms
from -01-01 to today !

I want to compute operations such as min, max, average on the *value column*,
for a given window function, and grouped by id ( Bonus :  if possible for
only some timestamps... )

For example if I have 3 tuples :

id | timestamp(ms) | value
B | 100 |  100
B | 110 |  50
B | 120 |  200
B | 250 |  500

I would like to be able to compute the min value for windows of time = 20.
This would result in such a DF :

id | timestamp(ms) | value | min___value
B | 100 |  100 | 100
B | 110 |  50  | 50
B | 120 |  200 | 50
B | 250 |  500 | 500

This seems the perfect use case for window function in spark  ( cf :
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
 )
I can use :

Window.orderBy("timestamp").partitionBy("id").rangeBetween(-20,0)
df.withColumn("min___value", min(df.col("value")).over(tw))

This leads to the perfect answer !

However, there is a big bug with window functions as reported here (
https://issues.apache.org/jira/browse/SPARK-19451 ) when working with Long
values !!! So I can't use this

So my question is ( of course ) how can I resolve my problem ?
If I use spark streaming I will face the same issue ?

I'll be glad to discuss this problem with you, feel free to answer :)

Regards,

Julien
-- 


Julien CHAMP — Data Scientist


*Web : **www.tellmeplus.com*  — *Email :
**jch...@tellmeplus.com
*

*Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*


TellMePlus S.A — Predictive Objects

*Paris* : 7 rue des Pommerots, 78400 Chatou
*Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière

-- 

Ce message peut contenir des informations confidentielles ou couvertes par 
le secret professionnel, à l’intention de son destinataire. Si vous n’en 
êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer 
toute copie.
This email may contain confidential and/or privileged information for the 
intended recipient. If you are not the intended recipient, please contact 
the sender and delete all copies.


--