Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jungtaek Lim
Glad to help, Jacek.

I'm happy you're doing similar thing, which means it could be pretty useful
for others as well. Looks like it might be good enough to contribute state
source and sink. I'll sort out my code and submit a PR.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Thu, Jun 27, 2019 at 7:54 AM Jacek Laskowski  wrote:

> Hi Jungtaek,
>
> That's very helpful to have the state source. As a matter of fact I've
> just this week been working on a similar tool (!) and have been wondering
> how to recreate the schema of the state key and value. You've helped me a
> lot. Thanks.
>
> Jacek
>
> On Wed, 26 Jun 2019, 23:58 Jungtaek Lim,  wrote:
>
>> Hi,
>>
>> you could consider state operator's partition numbers as "max
>> parallelism", as parallelism can be reduced via applying coalesce. It would
>> be effectively working similar as key groups.
>>
>> If you're also considering offline query, there's a tool to manipulate
>> state which enables reading and writing state in structured streaming,
>> achieving rescaling and schema evolution.
>>
>> https://github.com/HeartSaVioR/spark-state-tools
>> (DISCLAIMER: I'm an author of this tool.)
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei 
>> wrote:
>>
>>> Thank you for your quick reply!
>>>
>>> Is there any plan to improve this?
>>>
>>> I asked this question due to some investigation on comparing those state
>>> of art streaming systems, among which Flink and DataFlow allow changing
>>> parallelism number, and by my knowledge of Spark Streaming, it seems it is
>>> also able to do that: if some “key interval” concept is used, then state
>>> can somehow decoupled from partition number by consistent hashing.
>>>
>>>
>>>
>>>
>>>
>>> Regards
>>>
>>> Jialei
>>>
>>>
>>>
>>> *From: *Jacek Laskowski 
>>> *Date: *Wednesday, June 26, 2019 at 11:00 AM
>>> *To: *"Rong, Jialei" 
>>> *Cc: *"user @spark" 
>>> *Subject: *Re: Change parallelism number in Spark Streaming
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> It's not allowed to change the numer of partitions after your streaming
>>> query is started.
>>>
>>>
>>>
>>> The reason is exactly the number of state stores which is exactly the
>>> number of partitions (perhaps multiplied by the number of stateful
>>> operators).
>>>
>>>
>>>
>>> I think you'll even get a warning or an exception when you change it
>>> after restarting the query.
>>>
>>>
>>>
>>> The number of partitions is stored in a checkpoint location.
>>>
>>>
>>>
>>> Jacek
>>>
>>>
>>>
>>> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, 
>>> wrote:
>>>
>>> Hi Dear Spark Expert
>>>
>>>
>>>
>>> I’m curious about a question regarding Spark Streaming/Structured
>>> Streaming: whether it allows to change parallelism number(the default one
>>> or the one specified in particular operator) in a stream having stateful
>>> transform/operator? Whether this will cause my checkpointed state get
>>> messed up?
>>>
>>>
>>>
>>>
>>>
>>> Regards
>>>
>>> Jialei
>>>
>>>
>>>
>>>
>>
>> --
>> Name : Jungtaek Lim
>> Blog : http://medium.com/@heartsavior
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Array[Byte] from BinaryFiles can not be deserialized on Spark Yarn mode

2019-06-26 Thread big data
Additional info about this problems:

The deserialize code like this:

public static Block deserializeFrom(byte[] bytes) {
try {
Block b = SerializationUtils.deserialize(bytes);
System.out.println("b="+b);
return b;
} catch (ClassCastException e) {
System.out.println("ClassCastException");
e.printStackTrace();
} catch (IllegalArgumentException e) {
System.out.println("IllegalArgumentException");
e.printStackTrace();

} catch (SerializationException e) {
System.out.println("SerializationException");
e.printStackTrace();
}
return null;
}


The Spark code is:

val fis = spark.sparkContext.binaryFiles("/folder/abc*.file")
val RDD = fis.map(x => {
  val content = x._2.toArray()
  val b = Block.deserializeFrom(content)
  ...
}


All codes above can run successfully in Spark local mode, but when run it in 
Yarn cluster mode, the error happens.



在 2019/6/26 下午5:52, big data 写道:

I use Apache Commons Lang3's SerializationUtils in the code. 
SerializationUtils.serialize() to store a customized class as files into disk 
and SerializationUtils.deserialize(byte[]) to restore them again.

In the Spark local Mode, all serialized files can be deserialized normally and 
no error happens. But when I copy these serialized files into HDFS, and read 
them from HDFS by using Yarn mode, a SerializeException happens.

the stack error as below:

org.apache.commons.lang3.SerializationException: 
java.lang.ClassNotFoundException: com..
at 
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:227)
at 
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:265)
at com.com...deserializeFrom(XXX.java:81)
at com.XXX.$$anonfun$3.apply(B.scala:157)
at com.XXX.$$anonfun$3.apply(B.scala:153)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com..
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:686)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at 
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:223)

I've check the loaded byte[] from BinaryFiles, both from local and from HDFS 
are same. But why it can not be deserialized from HDFS?

The Class which labled ClassNotFound can be run normally in the Spark code.





Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jacek Laskowski
Hi Jungtaek,

That's very helpful to have the state source. As a matter of fact I've just
this week been working on a similar tool (!) and have been wondering how to
recreate the schema of the state key and value. You've helped me a lot.
Thanks.

Jacek

On Wed, 26 Jun 2019, 23:58 Jungtaek Lim,  wrote:

> Hi,
>
> you could consider state operator's partition numbers as "max
> parallelism", as parallelism can be reduced via applying coalesce. It would
> be effectively working similar as key groups.
>
> If you're also considering offline query, there's a tool to manipulate
> state which enables reading and writing state in structured streaming,
> achieving rescaling and schema evolution.
>
> https://github.com/HeartSaVioR/spark-state-tools
> (DISCLAIMER: I'm an author of this tool.)
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei 
> wrote:
>
>> Thank you for your quick reply!
>>
>> Is there any plan to improve this?
>>
>> I asked this question due to some investigation on comparing those state
>> of art streaming systems, among which Flink and DataFlow allow changing
>> parallelism number, and by my knowledge of Spark Streaming, it seems it is
>> also able to do that: if some “key interval” concept is used, then state
>> can somehow decoupled from partition number by consistent hashing.
>>
>>
>>
>>
>>
>> Regards
>>
>> Jialei
>>
>>
>>
>> *From: *Jacek Laskowski 
>> *Date: *Wednesday, June 26, 2019 at 11:00 AM
>> *To: *"Rong, Jialei" 
>> *Cc: *"user @spark" 
>> *Subject: *Re: Change parallelism number in Spark Streaming
>>
>>
>>
>> Hi,
>>
>>
>>
>> It's not allowed to change the numer of partitions after your streaming
>> query is started.
>>
>>
>>
>> The reason is exactly the number of state stores which is exactly the
>> number of partitions (perhaps multiplied by the number of stateful
>> operators).
>>
>>
>>
>> I think you'll even get a warning or an exception when you change it
>> after restarting the query.
>>
>>
>>
>> The number of partitions is stored in a checkpoint location.
>>
>>
>>
>> Jacek
>>
>>
>>
>> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, 
>> wrote:
>>
>> Hi Dear Spark Expert
>>
>>
>>
>> I’m curious about a question regarding Spark Streaming/Structured
>> Streaming: whether it allows to change parallelism number(the default one
>> or the one specified in particular operator) in a stream having stateful
>> transform/operator? Whether this will cause my checkpointed state get
>> messed up?
>>
>>
>>
>>
>>
>> Regards
>>
>> Jialei
>>
>>
>>
>>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>


Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jacek Laskowski
Hi,

No idea. I've just begun exploring the current state of state management in
spark structured streaming. I'd not be surprised if what you're after were
not possible. Stateful stream processing in SSS is fairly young.

Jacek

On Wed, 26 Jun 2019, 21:48 Rong, Jialei,  wrote:

> Thank you for your quick reply!
>
> Is there any plan to improve this?
>
> I asked this question due to some investigation on comparing those state
> of art streaming systems, among which Flink and DataFlow allow changing
> parallelism number, and by my knowledge of Spark Streaming, it seems it is
> also able to do that: if some “key interval” concept is used, then state
> can somehow decoupled from partition number by consistent hashing.
>
>
>
>
>
> Regards
>
> Jialei
>
>
>
> *From: *Jacek Laskowski 
> *Date: *Wednesday, June 26, 2019 at 11:00 AM
> *To: *"Rong, Jialei" 
> *Cc: *"user @spark" 
> *Subject: *Re: Change parallelism number in Spark Streaming
>
>
>
> Hi,
>
>
>
> It's not allowed to change the numer of partitions after your streaming
> query is started.
>
>
>
> The reason is exactly the number of state stores which is exactly the
> number of partitions (perhaps multiplied by the number of stateful
> operators).
>
>
>
> I think you'll even get a warning or an exception when you change it after
> restarting the query.
>
>
>
> The number of partitions is stored in a checkpoint location.
>
>
>
> Jacek
>
>
>
> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, 
> wrote:
>
> Hi Dear Spark Expert
>
>
>
> I’m curious about a question regarding Spark Streaming/Structured
> Streaming: whether it allows to change parallelism number(the default one
> or the one specified in particular operator) in a stream having stateful
> transform/operator? Whether this will cause my checkpointed state get
> messed up?
>
>
>
>
>
> Regards
>
> Jialei
>
>
>
>


Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Rong, Jialei
Fantastic, thanks!


From: Jungtaek Lim 
Date: Wednesday, June 26, 2019 at 2:59 PM
To: "Rong, Jialei" 
Cc: Jacek Laskowski , "user @spark" 
Subject: Re: Change parallelism number in Spark Streaming

Hi,

you could consider state operator's partition numbers as "max parallelism", as 
parallelism can be reduced via applying coalesce. It would be effectively 
working similar as key groups.

If you're also considering offline query, there's a tool to manipulate state 
which enables reading and writing state in structured streaming, achieving 
rescaling and schema evolution.

https://github.com/HeartSaVioR/spark-state-tools
(DISCLAIMER: I'm an author of this tool.)

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei  wrote:
Thank you for your quick reply!
Is there any plan to improve this?
I asked this question due to some investigation on comparing those state of art 
streaming systems, among which Flink and DataFlow allow changing parallelism 
number, and by my knowledge of Spark Streaming, it seems it is also able to do 
that: if some “key interval” concept is used, then state can somehow decoupled 
from partition number by consistent hashing.


Regards
Jialei

From: Jacek Laskowski mailto:ja...@japila.pl>>
Date: Wednesday, June 26, 2019 at 11:00 AM
To: "Rong, Jialei" 
Cc: "user @spark" mailto:user@spark.apache.org>>
Subject: Re: Change parallelism number in Spark Streaming

Hi,

It's not allowed to change the numer of partitions after your streaming query 
is started.

The reason is exactly the number of state stores which is exactly the number of 
partitions (perhaps multiplied by the number of stateful operators).

I think you'll even get a warning or an exception when you change it after 
restarting the query.

The number of partitions is stored in a checkpoint location.

Jacek

On Wed, 26 Jun 2019, 19:30 Rong, Jialei,  wrote:
Hi Dear Spark Expert

I’m curious about a question regarding Spark Streaming/Structured Streaming: 
whether it allows to change parallelism number(the default one or the one 
specified in particular operator) in a stream having stateful 
transform/operator? Whether this will cause my checkpointed state get messed up?


Regards
Jialei



--
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jungtaek Lim
Hi,

you could consider state operator's partition numbers as "max parallelism",
as parallelism can be reduced via applying coalesce. It would be
effectively working similar as key groups.

If you're also considering offline query, there's a tool to manipulate
state which enables reading and writing state in structured streaming,
achieving rescaling and schema evolution.

https://github.com/HeartSaVioR/spark-state-tools
(DISCLAIMER: I'm an author of this tool.)

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei 
wrote:

> Thank you for your quick reply!
>
> Is there any plan to improve this?
>
> I asked this question due to some investigation on comparing those state
> of art streaming systems, among which Flink and DataFlow allow changing
> parallelism number, and by my knowledge of Spark Streaming, it seems it is
> also able to do that: if some “key interval” concept is used, then state
> can somehow decoupled from partition number by consistent hashing.
>
>
>
>
>
> Regards
>
> Jialei
>
>
>
> *From: *Jacek Laskowski 
> *Date: *Wednesday, June 26, 2019 at 11:00 AM
> *To: *"Rong, Jialei" 
> *Cc: *"user @spark" 
> *Subject: *Re: Change parallelism number in Spark Streaming
>
>
>
> Hi,
>
>
>
> It's not allowed to change the numer of partitions after your streaming
> query is started.
>
>
>
> The reason is exactly the number of state stores which is exactly the
> number of partitions (perhaps multiplied by the number of stateful
> operators).
>
>
>
> I think you'll even get a warning or an exception when you change it after
> restarting the query.
>
>
>
> The number of partitions is stored in a checkpoint location.
>
>
>
> Jacek
>
>
>
> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, 
> wrote:
>
> Hi Dear Spark Expert
>
>
>
> I’m curious about a question regarding Spark Streaming/Structured
> Streaming: whether it allows to change parallelism number(the default one
> or the one specified in particular operator) in a stream having stateful
> transform/operator? Whether this will cause my checkpointed state get
> messed up?
>
>
>
>
>
> Regards
>
> Jialei
>
>
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


How to make sure that function is executed on each active executor?

2019-06-26 Thread Parag Chaudhari
Hi,

I am working on some use case where I want to perform some action on each
active executor of application once. How to run some function on each
active executor associated with current spark application?

num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
if num_executors > 0:
dummyRDD = self.parallelize(range(num_executors), num_executors)
dummyRDD.foreachPartition(functionfoo)

Will it guarantee that function foo will be executed on each active
executor? Or will it miss few executors if there are more than 1 core per
executor?

Deeply appreciate help and time.


*Thanks,Parag Chaudhari*


Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Rong, Jialei
Thank you for your quick reply!
Is there any plan to improve this?
I asked this question due to some investigation on comparing those state of art 
streaming systems, among which Flink and DataFlow allow changing parallelism 
number, and by my knowledge of Spark Streaming, it seems it is also able to do 
that: if some “key interval” concept is used, then state can somehow decoupled 
from partition number by consistent hashing.


Regards
Jialei

From: Jacek Laskowski 
Date: Wednesday, June 26, 2019 at 11:00 AM
To: "Rong, Jialei" 
Cc: "user @spark" 
Subject: Re: Change parallelism number in Spark Streaming

Hi,

It's not allowed to change the numer of partitions after your streaming query 
is started.

The reason is exactly the number of state stores which is exactly the number of 
partitions (perhaps multiplied by the number of stateful operators).

I think you'll even get a warning or an exception when you change it after 
restarting the query.

The number of partitions is stored in a checkpoint location.

Jacek

On Wed, 26 Jun 2019, 19:30 Rong, Jialei,  wrote:
Hi Dear Spark Expert

I’m curious about a question regarding Spark Streaming/Structured Streaming: 
whether it allows to change parallelism number(the default one or the one 
specified in particular operator) in a stream having stateful 
transform/operator? Whether this will cause my checkpointed state get messed up?


Regards
Jialei



Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jacek Laskowski
Hi,

It's not allowed to change the numer of partitions after your streaming
query is started.

The reason is exactly the number of state stores which is exactly the
number of partitions (perhaps multiplied by the number of stateful
operators).

I think you'll even get a warning or an exception when you change it after
restarting the query.

The number of partitions is stored in a checkpoint location.

Jacek

On Wed, 26 Jun 2019, 19:30 Rong, Jialei,  wrote:

> Hi Dear Spark Expert
>
>
>
> I’m curious about a question regarding Spark Streaming/Structured
> Streaming: whether it allows to change parallelism number(the default one
> or the one specified in particular operator) in a stream having stateful
> transform/operator? Whether this will cause my checkpointed state get
> messed up?
>
>
>
>
>
> Regards
>
> Jialei
>
>
>


Change parallelism number in Spark Streaming

2019-06-26 Thread Rong, Jialei
Hi Dear Spark Expert

I’m curious about a question regarding Spark Streaming/Structured Streaming: 
whether it allows to change parallelism number(the default one or the one 
specified in particular operator) in a stream having stateful 
transform/operator? Whether this will cause my checkpointed state get messed up?


Regards
Jialei



How to run spark on GPUs

2019-06-26 Thread Jorge Machado
Hi Guys, 

what is the current recommend way to use GPUs on spark ? 

Which scheduler should we use ? Mesos Or Kubernetes ? 

What are the approaches to follow until 
https://issues.apache.org/jira/browse/SPARK-24615 
 is in place. 

Thanks
Jorge 



check is empty effieciently

2019-06-26 Thread SNEHASISH DUTTA
Hi,
which is more efficient?

this is already defined since 2.4.0


*def isEmpty: Boolean = withAction("isEmpty",
limit(1).groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0) == 0}*

or

* df.head(1).isEmpty*


I am checking if a DF is empty and it is taking forever

Regards,
Snehasish


Re: [EXTERNAL] - Re: Problem with the ML ALS algorithm

2019-06-26 Thread Nick Pentreath
Generally I would say 10s is a bit low, while a few 100s+ starts to make
sense. Of course it depends a lot on the specific use case, item catalogue
etc, user experience / platform, etc.

On Wed, Jun 26, 2019 at 3:57 PM Steve Pruitt  wrote:

> I should have mentioned this is a synthetic dataset I create using some
> likelihood distributions of the rating values.  I am only experimenting /
> learning.  In practice though, the list of items is likely to be at least
> in the 10’s if not 100’s.  Are even this item numbers to low?
>
>
>
> Thanks.
>
>
>
> -S
>
>
>
> *From:* Nick Pentreath 
> *Sent:* Wednesday, June 26, 2019 9:09 AM
> *To:* user@spark.apache.org
> *Subject:* Re: [EXTERNAL] - Re: Problem with the ML ALS algorithm
>
>
>
> If the number of items is indeed 4, then another issue is the rank of the
> factors defaults to 10. Setting the "rank" parameter < 4 will help.
>
>
>
> However, if you only have 4 items, then I would propose that using ALS (or
> any recommendation model in fact) is not really necessary. There is not
> really enough information as well as sparsity, to make collaborative
> filtering useful. And you could simply recommend all items a user has not
> rated and the result would be the same essentially.
>
>
>
>
>
> On Wed, Jun 26, 2019 at 3:03 PM Steve Pruitt  wrote:
>
> Number of users is 1055
>
> Number of items is 4
>
> Ratings values are either 120, 20, 0
>
>
>
>
>
> *From:* Nick Pentreath 
> *Sent:* Wednesday, June 26, 2019 6:03 AM
> *To:* user@spark.apache.org
> *Subject:* [EXTERNAL] - Re: Problem with the ML ALS algorithm
>
>
>
> This means that the matrix that ALS is trying to factor is not positive
> definite. Try increasing regParam (try 0.1, 1.0 for example).
>
>
>
> What does the data look like? e.g. number of users, number of items,
> number of ratings, etc?
>
>
>
> On Wed, Jun 26, 2019 at 12:06 AM Steve Pruitt 
> wrote:
>
> I get an inexplicable exception when trying to build an ALSModel with the
> implicit set to true.  I can’t find any help online.
>
>
>
> Thanks in advance.
>
>
>
> My code is:
>
>
>
> ALS als = new ALS()
>
> .setMaxIter(5)
>
> .setRegParam(0.01)
>
> .setUserCol("customer")
>
> .setItemCol("item")
>
> .setImplicitPrefs(true)
>
> .setRatingCol("rating");
>
> ALSModel model = als.fit(training);
>
>
>
> The exception is:
>
> org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6
> because A is not positive definite. Is A derived from a singular matrix
> (e.g. collinear column values)?
>
> at
> org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
> at
> org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
> at
> org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
>


RE: [EXTERNAL] - Re: Problem with the ML ALS algorithm

2019-06-26 Thread Steve Pruitt
I should have mentioned this is a synthetic dataset I create using some 
likelihood distributions of the rating values.  I am only experimenting / 
learning.  In practice though, the list of items is likely to be at least in 
the 10’s if not 100’s.  Are even this item numbers to low?

Thanks.

-S

From: Nick Pentreath 
Sent: Wednesday, June 26, 2019 9:09 AM
To: user@spark.apache.org
Subject: Re: [EXTERNAL] - Re: Problem with the ML ALS algorithm

If the number of items is indeed 4, then another issue is the rank of the 
factors defaults to 10. Setting the "rank" parameter < 4 will help.

However, if you only have 4 items, then I would propose that using ALS (or any 
recommendation model in fact) is not really necessary. There is not really 
enough information as well as sparsity, to make collaborative filtering useful. 
And you could simply recommend all items a user has not rated and the result 
would be the same essentially.


On Wed, Jun 26, 2019 at 3:03 PM Steve Pruitt 
mailto:bpru...@opentext.com>> wrote:
Number of users is 1055
Number of items is 4
Ratings values are either 120, 20, 0


From: Nick Pentreath mailto:nick.pentre...@gmail.com>>
Sent: Wednesday, June 26, 2019 6:03 AM
To: user@spark.apache.org
Subject: [EXTERNAL] - Re: Problem with the ML ALS algorithm

This means that the matrix that ALS is trying to factor is not positive 
definite. Try increasing regParam (try 0.1, 1.0 for example).

What does the data look like? e.g. number of users, number of items, number of 
ratings, etc?

On Wed, Jun 26, 2019 at 12:06 AM Steve Pruitt 
mailto:bpru...@opentext.com>> wrote:
I get an inexplicable exception when trying to build an ALSModel with the 
implicit set to true.  I can’t find any help online.

Thanks in advance.

My code is:

ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("customer")
.setItemCol("item")
.setImplicitPrefs(true)
.setRatingCol("rating");
ALSModel model = als.fit(training);

The exception is:
org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 
because A is not positive definite. Is A derived from a singular matrix (e.g. 
collinear column values)?
at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
 ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
 ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
at 
org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747) 
~[spark-mllib_2.11-2.3.1.jar:2.3.1]


Re: [EXTERNAL] - Re: Problem with the ML ALS algorithm

2019-06-26 Thread Nick Pentreath
If the number of items is indeed 4, then another issue is the rank of the
factors defaults to 10. Setting the "rank" parameter < 4 will help.

However, if you only have 4 items, then I would propose that using ALS (or
any recommendation model in fact) is not really necessary. There is not
really enough information as well as sparsity, to make collaborative
filtering useful. And you could simply recommend all items a user has not
rated and the result would be the same essentially.


On Wed, Jun 26, 2019 at 3:03 PM Steve Pruitt  wrote:

> Number of users is 1055
>
> Number of items is 4
>
> Ratings values are either 120, 20, 0
>
>
>
>
>
> *From:* Nick Pentreath 
> *Sent:* Wednesday, June 26, 2019 6:03 AM
> *To:* user@spark.apache.org
> *Subject:* [EXTERNAL] - Re: Problem with the ML ALS algorithm
>
>
>
> This means that the matrix that ALS is trying to factor is not positive
> definite. Try increasing regParam (try 0.1, 1.0 for example).
>
>
>
> What does the data look like? e.g. number of users, number of items,
> number of ratings, etc?
>
>
>
> On Wed, Jun 26, 2019 at 12:06 AM Steve Pruitt 
> wrote:
>
> I get an inexplicable exception when trying to build an ALSModel with the
> implicit set to true.  I can’t find any help online.
>
>
>
> Thanks in advance.
>
>
>
> My code is:
>
>
>
> ALS als = new ALS()
>
> .setMaxIter(5)
>
> .setRegParam(0.01)
>
> .setUserCol("customer")
>
> .setItemCol("item")
>
> .setImplicitPrefs(true)
>
> .setRatingCol("rating");
>
> ALSModel model = als.fit(training);
>
>
>
> The exception is:
>
> org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6
> because A is not positive definite. Is A derived from a singular matrix
> (e.g. collinear column values)?
>
> at
> org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
> at
> org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
> at
> org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
>


RE: [EXTERNAL] - Re: Problem with the ML ALS algorithm

2019-06-26 Thread Steve Pruitt
Number of users is 1055
Number of items is 4
Ratings values are either 120, 20, 0


From: Nick Pentreath 
Sent: Wednesday, June 26, 2019 6:03 AM
To: user@spark.apache.org
Subject: [EXTERNAL] - Re: Problem with the ML ALS algorithm

This means that the matrix that ALS is trying to factor is not positive 
definite. Try increasing regParam (try 0.1, 1.0 for example).

What does the data look like? e.g. number of users, number of items, number of 
ratings, etc?

On Wed, Jun 26, 2019 at 12:06 AM Steve Pruitt 
mailto:bpru...@opentext.com>> wrote:
I get an inexplicable exception when trying to build an ALSModel with the 
implicit set to true.  I can’t find any help online.

Thanks in advance.

My code is:

ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("customer")
.setItemCol("item")
.setImplicitPrefs(true)
.setRatingCol("rating");
ALSModel model = als.fit(training);

The exception is:
org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 
because A is not positive definite. Is A derived from a singular matrix (e.g. 
collinear column values)?
at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
 ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
 ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
at 
org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747) 
~[spark-mllib_2.11-2.3.1.jar:2.3.1]


[SPARK-23153][K8s] Would be available in Spark 2.X ?

2019-06-26 Thread ERIC JOEL BLANCO-HERMIDA SANZ
Hi,

I’m using Spark 2.4.3 on K8s and would like to to what’s solved in 
[Spark-23153], that is, be able to download dependencies through —packages and 
that the driver could access them. Right now, in Spark 2.4.3, after the 
spark-submit and download of dependencies the driver cannot access them.

I see that it is closed but for Spark 3.0.0. Would it be also for Spark 2.X ?

Thanks!



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org