Re: [Spark Streaming on Mesos (good practices)]

2015-08-24 Thread Aram Mkrtchyan
Here is the answer to my question if somebody needs it

   Running Spark in Standalone mode or coarse-grained Mesos mode leads to
better task launch times than the fine-grained Mesos mode.

The resource is
http://spark.apache.org/docs/latest/streaming-programming-guide.html

On Mon, Aug 24, 2015 at 12:15 PM, Aram Mkrtchyan <
aram.mkrtchyan...@gmail.com> wrote:

> which are the best practices to submit spark streaming application on
> mesos.
> I would like to know about scheduler mode.
> Is `coarse-grained` mode right solution?
>
> Thanks
>


[Spark Streaming on Mesos (good practices)]

2015-08-24 Thread Aram Mkrtchyan
which are the best practices to submit spark streaming application on mesos.
I would like to know about scheduler mode.
Is `coarse-grained` mode right solution?

Thanks


Re: DataFrameWriter.jdbc is very slow

2015-08-21 Thread Aram Mkrtchyan
It's good
Thanks for your reply Michael.

On Thu, Aug 20, 2015 at 11:03 PM, Michael Armbrust 
wrote:

> We will probably fix this in Spark 1.6
>
> https://issues.apache.org/jira/browse/SPARK-10040
>
> On Thu, Aug 20, 2015 at 5:18 AM, Aram Mkrtchyan <
> aram.mkrtchyan...@gmail.com> wrote:
>
>> We want to migrate our data (approximately 20M rows) from parquet to 
>> postgres,
>> when we are using dataframe writer's jdbc method the execution time is very
>> large,  we have tried the same with batch insert it was much effective.
>> Is it intentionally implemented in that way?
>>
>
>


Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Aram Mkrtchyan
Hi,

hope this will help you

import org.apache.spark.sql.functions._
import sqlContext.implicits._
import java.sql.Timestamp

val df = sc.parallelize(Array((date1, date2))).toDF("day1", "day2")

val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) =>
  Days.daysBetween(new DateTime(value2.getTime), new
DateTime(value1.getTime)).getDays)
df.withColumn("diff", dateDiff(df("day2"), df("day1"))).show()

or you can write sql query using hiveql's datediff function.
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel  wrote:

> More update on this question..I am using spark 1.4.1.
>
> I was just reading documentation of spark 1.5 (still in development) and I
> think there will be a new func *datediff* that will solve the issue. So
> please let me know if there is any work-around until spark 1.5 is out :).
>
> pyspark.sql.functions.datediff(*end*, *start*)[source]
> 
> 
>
> Returns the number of days from start to end.
>
> >>> df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 
> >>> 'd2'])>>> df.select(datediff(df.d2, 
> >>> df.d1).alias('diff')).collect()[Row(diff=32)]
>
> New in version 1.5.
>
> On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel 
> wrote:
>
>> Apologies, sent too early accidentally. Actual message is below
>> 
>>
>> A dataframe has 2 datecolumns (datetime type) and I would like to add
>> another column that would have difference between these two dates.
>> Dataframe snippet is below.
>>
>> new_df.show(5)
>> +---+--+--+
>> | PATID| SVCDATE|next_diag_date|
>> +---+--+--+
>> |12345655545|2012-02-13| 2012-02-13|
>> |12345655545|2012-02-13| 2012-02-13|
>> |12345655545|2012-02-13| 2012-02-27|
>> +---+--+--+
>>
>>
>>
>> Here is what I have tried so far:
>>
>> -> new_df.withColumn('SVCDATE2',
>> (new_df.next_diag_date-new_df.SVCDATE)).show()
>> Error: DateType does not support numeric operations
>>
>> -> new_df.withColumn('SVCDATE2',
>> (new_df.next_diag_date-new_df.SVCDATE).days).show()
>> Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);
>>
>>
>> However this simple python code works fine with pySpark:
>>
>> from datetime import date
>> d0 = date(2008, 8, 18)
>> d1 = date(2008, 9, 26)
>> delta = d0 - d1
>> print (d0 - d1).days
>>
>> # -39
>>
>>
>> Any suggestions would be appreciated! Also is there a way to add a new
>> column in dataframe without using column expression (e.g. like in pandas or
>> R. df$new_col = 'new col value')?
>>
>>
>> Thanks,
>> Dhaval
>>
>>
>>
>> On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel 
>> wrote:
>>
>>> new_df.withColumn('SVCDATE2',
>>> (new_df.next_diag_date-new_df.SVCDATE).days).show()
>>>
>>> +---+--+--+ | PATID| SVCDATE|next_diag_date|
>>> +---+--+--+ |12345655545|2012-02-13|
>>> 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
>>> 2012-02-27| +---+--+--+
>>>
>>
>>
>


DataFrameWriter.jdbc is very slow

2015-08-20 Thread Aram Mkrtchyan
We want to migrate our data (approximately 20M rows) from parquet to postgres,
when we are using dataframe writer's jdbc method the execution time is very
large,  we have tried the same with batch insert it was much effective.
Is it intentionally implemented in that way?


[Runing Spark Applications with Chronos or Marathon]

2015-04-30 Thread Aram Mkrtchyan
Hi,

We want to have Marathon starting and monitoring Chronos, so that when
Chronos based Spark job fails, marathon automatically restarts them in
scope of Chronos. Will this approach work if we start Spark jobs as shell
scripts from Chronos or Marathon?


Re: Parallel actions from driver

2015-03-27 Thread Aram Mkrtchyan
Thanks Sean,

It works with Scala's parallel collections.

On Thu, Mar 26, 2015 at 11:35 PM, Sean Owen  wrote:

> You can do this much more simply, I think, with Scala's parallel
> collections (try .par). There's nothing wrong with doing this, no.
>
> Here, something is getting caught in your closure, maybe
> unintentionally, that's not serializable. It's not directly related to
> the parallelism.
>
> On Thu, Mar 26, 2015 at 3:54 PM, Aram Mkrtchyan
>  wrote:
> > Hi.
> >
> > I'm trying to trigger DataFrame's save method in parallel from my driver.
> > For that purposes I use ExecutorService and Futures, here's my code:
> >
> >
> > val futures = [1,2,3].map( t => pool.submit( new Runnable {
> >
> > override def run(): Unit = {
> > val commons = events.filter(_._1 == t).map(_._2.common)
> > saveAsParquetFile(sqlContext, commons, s"$t/common")
> > EventTypes.all.foreach { et =>
> > val eventData = events.filter(ev => ev._1 == t &&
> ev._2.eventType ==
> > et).map(_._2.data)
> > saveAsParquetFile(sqlContext, eventData, s"$t/$et")
> > }
> > }
> >
> > }))
> > futures.foreach(_.get)
> >
> > It throws "Task is not Serializable" exception. Is it legal to use
> threads
> > in driver to trigger actions?
>


Parallel actions from driver

2015-03-26 Thread Aram Mkrtchyan
Hi.

I'm trying to trigger DataFrame's save method in parallel from my driver.
For that purposes I use ExecutorService and Futures, here's my code:


val futures = [1,2,3].map( t => pool.submit( new Runnable {

override def run(): Unit = {
val commons = events.filter(_._1 == t).map(_._2.common)
saveAsParquetFile(sqlContext, commons, s"$t/common")
EventTypes.all.foreach { et =>
val eventData = events.filter(ev => ev._1 == t && ev._2.eventType
== et).map(_._2.data)
saveAsParquetFile(sqlContext, eventData, s"$t/$et")
}
}

}))
futures.foreach(_.get)

It throws "Task is not Serializable" exception. Is it legal to use threads
in driver to trigger actions?


Re: Apache Spark ALS recommendations approach

2015-03-18 Thread Aram Mkrtchyan
Thanks gen for helpful post.

Thank you Sean, we're currently exploring this world of recommendations
with Spark, and your posts are very helpful to us.
We've noticed that you're a co-author of "Advanced Analytics with Spark",
just not to get to deep into offtopic, will it be finished soon?

On Wed, Mar 18, 2015 at 5:47 PM, Sean Owen  wrote:

> I don't think that you need memory to put the whole joined data set in
> memory. However memory is unlikely to be the limiting factor, it's the
> massive shuffle.
>
> OK, you really do have a large recommendation problem if you're
> recommending for at least 7M users per day!
>
> My hunch is that it won't be fast enough to use the simple predict()
> or recommendProducts() method repeatedly. There was a proposal to make
> a recommendAll() method which you could crib
> (https://issues.apache.org/jira/browse/SPARK-3066) but that looks like
> still a work in progress since the point there was to do more work to
> make it possibly scale.
>
> You may consider writing a bit of custom code to do the scoring. For
> example cache parts of the item-factor matrix in memory on the workers
> and score user feature vectors in bulk against them.
>
> There's a different school of though which is to try to compute only
> what you need, on the fly, and cache it if you like. That is good in
> that it doesn't waste effort and makes the result fresh, but, of
> course, means creating or consuming some other system to do the
> scoring and getting *that* to run fast.
>
> You can also look into techniques like LSH for probabilistically
> guessing which tiny subset of all items are worth considering, but
> that's also something that needs building more code.
>
> I'm sure a couple people could chime in on that here but it's kind of
> a separate topic.
>
> On Wed, Mar 18, 2015 at 8:04 AM, Aram Mkrtchyan
>  wrote:
> > Thanks much for your reply.
> >
> > By saying on the fly, you mean caching the trained model, and querying it
> > for each user joined with 30M products when needed?
> >
> > Our question is more about the general approach, what if we have 7M DAU?
> > How the companies deal with that using Spark?
> >
> >
> > On Wed, Mar 18, 2015 at 3:39 PM, Sean Owen  wrote:
> >>
> >> Not just the join, but this means you're trying to compute 600
> >> trillion dot products. It will never finish fast. Basically: don't do
> >> this :) You don't in general compute all recommendations for all
> >> users, but recompute for a small subset of users that were or are
> >> likely to be active soon. (Or compute on the fly.) Is anything like
> >> that an option?
> >>
> >> On Wed, Mar 18, 2015 at 7:13 AM, Aram Mkrtchyan
> >>  wrote:
> >> > Trying to build recommendation system using Spark MLLib's ALS.
> >> >
> >> > Currently, we're trying to pre-build recommendations for all users on
> >> > daily
> >> > basis. We're using simple implicit feedbacks and ALS.
> >> >
> >> > The problem is, we have 20M users and 30M products, and to call the
> main
> >> > predict() method, we need to have the cartesian join for users and
> >> > products,
> >> > which is too huge, and it may take days to generate only the join. Is
> >> > there
> >> > a way to avoid cartesian join to make the process faster?
> >> >
> >> > Currently we have 8 nodes with 64Gb of RAM, I think it should be
> enough
> >> > for
> >> > the data.
> >> >
> >> > val users: RDD[Int] = ???   // RDD with 20M userIds
> >> > val products: RDD[Int] = ???// RDD with 30M productIds
> >> > val ratings : RDD[Rating] = ??? // RDD with all user->product
> >> > feedbacks
> >> >
> >> > val model = new ALS().setRank(10).setIterations(10)
> >> >   .setLambda(0.0001).setImplicitPrefs(true)
> >> >   .setAlpha(40).run(ratings)
> >> >
> >> > val usersProducts = users.cartesian(products)
> >> > val recommendations = model.predict(usersProducts)
> >
> >
>


Re: Apache Spark ALS recommendations approach

2015-03-18 Thread Aram Mkrtchyan
Thanks much for your reply.

By saying on the fly, you mean caching the trained model, and querying it
for each user joined with 30M products when needed?

Our question is more about the general approach, what if we have 7M DAU?
How the companies deal with that using Spark?


On Wed, Mar 18, 2015 at 3:39 PM, Sean Owen  wrote:

> Not just the join, but this means you're trying to compute 600
> trillion dot products. It will never finish fast. Basically: don't do
> this :) You don't in general compute all recommendations for all
> users, but recompute for a small subset of users that were or are
> likely to be active soon. (Or compute on the fly.) Is anything like
> that an option?
>
> On Wed, Mar 18, 2015 at 7:13 AM, Aram Mkrtchyan
>  wrote:
> > Trying to build recommendation system using Spark MLLib's ALS.
> >
> > Currently, we're trying to pre-build recommendations for all users on
> daily
> > basis. We're using simple implicit feedbacks and ALS.
> >
> > The problem is, we have 20M users and 30M products, and to call the main
> > predict() method, we need to have the cartesian join for users and
> products,
> > which is too huge, and it may take days to generate only the join. Is
> there
> > a way to avoid cartesian join to make the process faster?
> >
> > Currently we have 8 nodes with 64Gb of RAM, I think it should be enough
> for
> > the data.
> >
> > val users: RDD[Int] = ???   // RDD with 20M userIds
> > val products: RDD[Int] = ???// RDD with 30M productIds
> > val ratings : RDD[Rating] = ??? // RDD with all user->product
> feedbacks
> >
> > val model = new ALS().setRank(10).setIterations(10)
> >   .setLambda(0.0001).setImplicitPrefs(true)
> >   .setAlpha(40).run(ratings)
> >
> > val usersProducts = users.cartesian(products)
> > val recommendations = model.predict(usersProducts)
>


Apache Spark ALS recommendations approach

2015-03-18 Thread Aram
Hi all,

Trying to build recommendation system using Spark MLLib's ALS.

Currently, we're trying to pre-build recommendations for all users on daily
basis. We're using simple implicit feedbacks and ALS.

The problem is, we have 20M users and 30M products, and to call the main
predict() method, we need to have the cartesian join for users and products,
which is too huge, and it may take days to generate only the join. Is there
a way to avoid cartesian join to make the process faster?

Currently we have 8 nodes with 64Gb of RAM, I think it should be enough for
the data.

val users: RDD[Int] = ???   // RDD with 20M userIds
val products: RDD[Int] = ???// RDD with 30M productIds
val ratings : RDD[Rating] = ??? // RDD with all user->product feedbacks

val model = new ALS().setRank(10).setIterations(10)
  .setLambda(0.0001).setImplicitPrefs(true)
  .setAlpha(40).run(ratings)

val usersProducts = users.cartesian(products)
val recommendations = model.predict(usersProducts)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-ALS-recommendations-approach-tp22116.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Apache Spark ALS recommendations approach

2015-03-18 Thread Aram Mkrtchyan
Trying to build recommendation system using Spark MLLib's ALS.

Currently, we're trying to pre-build recommendations for all users on daily
basis. We're using simple implicit feedbacks and ALS.

The problem is, we have 20M users and 30M products, and to call the main
predict() method, we need to have the cartesian join for users and
products, which is too huge, and it may take days to generate only the
join. Is there a way to avoid cartesian join to make the process faster?

Currently we have 8 nodes with 64Gb of RAM, I think it should be enough for
the data.

val users: RDD[Int] = ???   // RDD with 20M userIds
val products: RDD[Int] = ???// RDD with 30M productIds
val ratings : RDD[Rating] = ??? // RDD with all user->product feedbacks

val model = new ALS().setRank(10).setIterations(10)
  .setLambda(0.0001).setImplicitPrefs(true)
  .setAlpha(40).run(ratings)

val usersProducts = users.cartesian(products)
val recommendations = model.predict(usersProducts)