Re: Spark | Window Function |

2017-07-18 Thread Radhwane Chebaane
Hi Julien,

Could you give more details about the problems you faced?
Here is a working example with Spark dataframe and Spark SQL:
https://gist.github.com/radcheb/d16042d8bb3815d3dd42030ecedc43cf


Cheers,
Radhwane Chebaane


2017-07-18 18:21 GMT+02:00 Julien CHAMP <jch...@tellmeplus.com>:

> Hi Radhwane !
>
> I've tested both your solutions using dataframe or spark sql... and in
> both cases spark is stucked :/
> Did you test the code that you gave me ? I don't know if I've done
> something wrong...
>
> Regards,
> Julien
>
> Le lun. 10 juil. 2017 à 10:53, Radhwane Chebaane <r.cheba...@mindlytix.com>
> a écrit :
>
>> Hi Julien,
>>
>>
>> - Usually, windows functions require less shuffle than cross join so it's
>> a little faster depending on use case. For large windows, cross join and
>> window functions performances are close.
>> - Use can use UDFs and UDAFs as in any Spark SQL request (Geometric Mean
>> tested successfully).
>>
>> Regards,
>> Radhwane
>>
>> 2017-07-06 16:22 GMT+02:00 Julien CHAMP <jch...@tellmeplus.com>:
>>
>>> Thx a lot for your answer Radhwane :)
>>>
>>>
>>> I have some (many) use case with such needs of Long in window function.
>>> As said in the bug report, I can store events in ms in a dataframe, and
>>> want to count the number of events in past 10 years ( requiring a Long
>>> value )
>>>
>>> -> *Let's imagine that this window is used on timestamp values in ms :
>>> I can ask for a window with a range between [-216000L, 0] and only have
>>> a few values inside, not necessarily 216000L. I can understand the
>>> limitaion for the rowBetween() method but the rangeBetween() method is nice
>>> for this kind of usage.*
>>>
>>>
>>> The solution with self join seems nice, but 2 questions :
>>>
>>> - regarding performances, will it be as fast as window function ?
>>>
>>> - can I use my own aggregate function ( for example a Geometric Mean )
>>> with your solution ? ( using this : https://docs.databricks.com/
>>> spark/latest/spark-sql/udaf-scala.html ?
>>>
>>>
>>>
>>> Thanks again,
>>>
>>> Regards,
>>>
>>>
>>> Julien
>>>
>>>
>>>
>>> Le mer. 5 juil. 2017 à 19:18, Radhwane Chebaane <
>>> r.cheba...@mindlytix.com> a écrit :
>>>
>>>> Hi Julien,
>>>>
>>>>
>>>> Although this is a strange bug in Spark, it's rare to need more than
>>>> Integer max value size for a window.
>>>>
>>>> Nevertheless, most of the window functions can be expressed with
>>>> self-joins. Hence, your problem may be solved with this example:
>>>>
>>>> If input data as follow:
>>>>
>>>> +---+-+-+
>>>> | id|timestamp|value|
>>>> +---+-+-+
>>>> |  B|1|  100|
>>>> |  B|10010|   50|
>>>> |  B|10020|  200|
>>>> |  B|25000|  500|
>>>> +---+-+-+
>>>>
>>>> And the window is  (-20L, 0)
>>>>
>>>> Then this code will give the wanted result:
>>>>
>>>> df.as("df1").join(df.as("df2"),
>>>>   $"df2.timestamp" between($"df1.timestamp" - 20L, 
>>>> $"df1.timestamp"))
>>>>   .groupBy($"df1.id", $"df1.timestamp", $"df1.value")
>>>>   .agg( functions.min($"df2.value").as("min___value"))
>>>>   .orderBy($"df1.timestamp")
>>>>   .show()
>>>>
>>>> +---+-+-+---+
>>>> | id|timestamp|value|min___value|
>>>> +---+-+-+---+
>>>> |  B|1|  100|100|
>>>> |  B|10010|   50| 50|
>>>> |  B|10020|  200| 50|
>>>> |  B|25000|  500|500|
>>>> +---+-+-+---+
>>>>
>>>> Or by SparkSQL:
>>>>
>>>> SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as 
>>>> min___value FROM
>>>> (
>>>>   SELECT a.id as id, a.timestamp as timestamp, a.value as value, 
>>>> b.timestamp as _timestamp, b.value as _value
>>>>   FROM df a CROSS JOIN df b
>&g

Re: Spark | Window Function |

2017-07-05 Thread Radhwane Chebaane
Hi Julien,


Although this is a strange bug in Spark, it's rare to need more than
Integer max value size for a window.

Nevertheless, most of the window functions can be expressed with
self-joins. Hence, your problem may be solved with this example:

If input data as follow:

+---+-+-+
| id|timestamp|value|
+---+-+-+
|  B|1|  100|
|  B|10010|   50|
|  B|10020|  200|
|  B|25000|  500|
+---+-+-+

And the window is  (-20L, 0)

Then this code will give the wanted result:

df.as("df1").join(df.as("df2"),
  $"df2.timestamp" between($"df1.timestamp" - 20L, $"df1.timestamp"))
  .groupBy($"df1.id", $"df1.timestamp", $"df1.value")
  .agg( functions.min($"df2.value").as("min___value"))
  .orderBy($"df1.timestamp")
  .show()

+---+-+-+---+
| id|timestamp|value|min___value|
+---+-+-+---+
|  B|1|  100|100|
|  B|10010|   50| 50|
|  B|10020|  200| 50|
|  B|25000|  500|500|
+---+-+-+---+

Or by SparkSQL:

SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as
min___value FROM
(
  SELECT a.id as id, a.timestamp as timestamp, a.value as value,
b.timestamp as _timestamp, b.value as _value
  FROM df a CROSS JOIN df b
  ON b.timestamp >= a.timestamp - 20L and b.timestamp <= a.timestamp
) c
GROUP BY c.id, c.timestamp, c.value ORDER BY c.timestamp


This must be also possible also on Spark Streaming however don't
expect high performance.


Cheers,
Radhwane



2017-07-05 10:41 GMT+02:00 Julien CHAMP <jch...@tellmeplus.com>:

> Hi there !
>
> Let me explain my problem to see if you have a good solution to help me :)
>
> Let's imagine that I have all my data in a DB or a file, that I load in a
> dataframe DF with the following columns :
> *id | timestamp(ms) | value*
> A | 100 |  100
> A | 110 |  50
> B | 100 |  100
> B | 110 |  50
> B | 120 |  200
> B | 250 |  500
> C | 100 |  200
> C | 110 |  500
>
> The timestamp is a *long value*, so as to be able to express date in ms
> from -01-01 to today !
>
> I want to compute operations such as min, max, average on the *value
> column*, for a given window function, and grouped by id ( Bonus :  if
> possible for only some timestamps... )
>
> For example if I have 3 tuples :
>
> id | timestamp(ms) | value
> B | 100 |  100
> B | 110 |  50
> B | 120 |  200
> B | 250 |  500
>
> I would like to be able to compute the min value for windows of time = 20.
> This would result in such a DF :
>
> id | timestamp(ms) | value | min___value
> B | 100 |  100 | 100
> B | 110 |  50  | 50
> B | 120 |  200 | 50
> B | 250 |  500 | 500
>
> This seems the perfect use case for window function in spark  ( cf :
> https://databricks.com/blog/2015/07/15/introducing-window-
> functions-in-spark-sql.html )
> I can use :
>
> Window.orderBy("timestamp").partitionBy("id").rangeBetween(-20,0)
> df.withColumn("min___value", min(df.col("value")).over(tw))
>
> This leads to the perfect answer !
>
> However, there is a big bug with window functions as reported here (
> https://issues.apache.org/jira/browse/SPARK-19451 ) when working with
> Long values !!! So I can't use this
>
> So my question is ( of course ) how can I resolve my problem ?
> If I use spark streaming I will face the same issue ?
>
> I'll be glad to discuss this problem with you, feel free to answer :)
>
> Regards,
>
> Julien
> --
>
>
> Julien CHAMP — Data Scientist
>
>
> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : 
> **jch...@tellmeplus.com
> <jch...@tellmeplus.com>*
>
> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
> <https://www.linkedin.com/in/julienchamp>
>
> TellMePlus S.A — Predictive Objects
>
> *Paris* : 7 rue des Pommerots, 78400 Chatou
> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière
>
>
> Ce message peut contenir des informations confidentielles ou couvertes par
> le secret professionnel, à l’intention de son destinataire. Si vous n’en
> êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer
> toute copie.
> This email may contain confidential and/or privileged information for the
> intended recipient. If you are not the intended recipient, please contact
> the sender and delete all copies.
>
>
> <http://www.tellmeplus.com/assets/emailing/banner.html>




-- 

[image: photo] Radhwane Chebaane
Distributed systems engineer, Mindlytix

Mail: radhw...@mindlytix.com  <radhw...@mindlytix.com>
Mobile: +33 695 588 906 <+33+695+588+906>
<https://mail.google.com/mail/u/0/#>
Skype: rad.cheb  <https://mail.google.com/mail/u/0/#>
LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b>
<https://mail.google.com/mail/u/0/#>


Re: Load multiple CSV from different paths

2017-07-05 Thread Radhwane Chebaane
Hi,

Referring to spark 2.x documentation, in
org.apache.spark.sql.DataFrameReader  you have this function:
def csv(paths: String*): DataFrame
<http://spark.apache.org/docs/2.1.0/api/scala/org/apache/spark/sql/package.html#DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]>

So you can unpack your Array of paths like this:

val sources = paths.split(',').toSeq

spark.read.option("header", "false")
.schema(custom_schema)
.option('delimiter', '\t')
.option('mode', 'DROPMALFORMED')
.csv(sources: _*)


In spark 1.6.x I think this may work with spark-csv
<https://github.com/databricks/spark-csv> :

spark.read.format("com.databricks.spark.csv").option("header", "false")
.schema(custom_schema)
.option('delimiter', '\t')
.option('mode', 'DROPMALFORMED')
.load(sources: _*)



Cheers,
Radhwane Chebaane

2017-07-05 16:08 GMT+02:00 Didac Gil <didacgil9...@gmail.com>:

> Hi,
>
> Do you know any simple way to load multiple csv files (same schema) that
> are in different paths?
> Wildcards are not a solution, as I want to load specific csv files from
> different folders.
>
> I came across a solution (https://stackoverflow.com/
> questions/37639956/how-to-import-multiple-csv-files-in-a-single-load) that
> suggests something like
>
> spark.read.format("csv").option("header", "false")
> .schema(custom_schema)
> .option('delimiter', '\t')
> .option('mode', 'DROPMALFORMED')
> .load(paths.split(','))
>
> However, even it mentions that this approach would work in Spark 2.x, I
> don’t find an implementation of load that accepts an Array[String] as an
> input parameter.
>
> Thanks in advance for your help.
>
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com
> Spain: +34 696 285 544 <+34%20696%2028%2055%2044>
> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
> Skype: didac.gil.de.la.iglesia
>
>


-- 

[image: photo] Radhwane Chebaane
Distributed systems engineer, Mindlytix

Mail: radhw...@mindlytix.com  <radhw...@mindlytix.com>
Mobile: +33 695 588 906 <+33+695+588+906>
<https://mail.google.com/mail/u/0/#>
Skype: rad.cheb  <https://mail.google.com/mail/u/0/#>
LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b>
<https://mail.google.com/mail/u/0/#>


Re: Cannot convert from JavaRDD to Dataframe

2017-04-24 Thread Radhwane Chebaane
Hi,

DataTypes is a Scala Array which corresponds in Java to Java Array. So you
must use a String[]. However since RowFactory.create expects an array of
Object as Columns content, it should be:

   public Row call(String line){
  return RowFactory.create(new String[][]{line.split(" ")});
   }

More details in this Stackoverflow question
<http://stackoverflow.com/questions/43411492/createdataframe-throws-exception-when-pass-javardd-that-contains-arraytype-col/43585039#43585039>
.
Hope this works for you,

Cheers

2017-04-23 18:13 GMT+02:00 Chen, Mingrui <mingr...@mail.smu.edu>:

> Hello everyone!
>
>
> I am a new Spark learner and trying to do a task seems very simple. I want
> to read a text file, save the content to JavaRDD and convert it to
> Dataframe, so I can use it for Word2Vec Model in the future. The code looks
> pretty simple but I cannot make it work:
>
>
> SparkSession spark = SparkSession.builder().appName("Word2Vec").
> getOrCreate();
> JavaRDD lines = spark.sparkContext().textFile("input.txt",
> 10).toJavaRDD();
> JavaRDD rows = lines.map(new Function<String, Row>(){
> public Row call(String line){
> return RowFactory.create(Arrays.asList(line.split(" ")));
> }
> });
> StructType schema = new StructType(new StructField[] {
> new StructField("text", new ArrayType(DataTypes.StringType, true), false,
> Metadata.empty())
> });
> Dataset input = spark.createDataFrame(rows, schema);
> input.show(3);
>
> It throws an exception at input.show(3):
>
>
> Caused by: java.lang.ClassCastException: cannot assign instance of
> scala.collection.immutable.List$SerializationProxy to field
> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type
> scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
>
> Seems it has problem converting the JavaRDD to Dataframe. However I
> cannot figure out what mistake I make here and the exception message is
> hard to understand. Anyone can help? Thanks!
>
>


-- 

[image: photo] Radhwane Chebaane
Distributed systems engineer, Mindlytix

Mail: radhw...@mindlytix.com  <radhw...@mindlytix.com>
Mobile: +33 695 588 906 <+33+695+588+906>
<https://mail.google.com/mail/u/0/#>
Skype: rad.cheb  <https://mail.google.com/mail/u/0/#>
LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b>
<https://mail.google.com/mail/u/0/#>


[sparkR] [MLlib] : Is word2vec implemented in SparkR MLlib ?

2017-04-20 Thread Radhwane Chebaane
Hi,

I've been experimenting with the Spark *Word2vec* implementation in the
MLLib package with Scala and it was very nice.
I need to use the same algorithm in R leveraging the power of spark
distribution with SparkR.
I have been looking on the mailing list and Stackoverflow for any
*Word2vec* use-case
in SparkR but no luck.

Is there any implementation of *Word2vec* in *SparkR* ? Is there any
current work to support this feature in MLlib with R?

Thanks!
Radhwane Chebaane

-- 

[image: photo] Radhwane Chebaane
Distributed systems engineer, Mindlytix

Mail: radhw...@mindlytix.com  <radhw...@mindlytix.com>
Mobile: +33 695 588 906 <+33+695+588+906>
<https://mail.google.com/mail/u/0/#>
Skype: rad.cheb  <https://mail.google.com/mail/u/0/#>
LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b>
<https://mail.google.com/mail/u/0/#>