Re: Lemmatization using StanfordNLP in ML 2.0

2016-09-24 Thread Timur Shenkao
Hello, everybody!

May be it's not a reason of your problem, but I've noticed the line in your
commentaries:
*java version "1.8.0_51"*

It's strongly advised to use Java 1.8.0_66+
I use even Java 1.8.0_101


On Tue, Sep 20, 2016 at 1:09 AM, janardhan shetty 
wrote:

> Yes Sujit I have tried that option as well.
> Also tried sbt assembly but hitting below issue:
>
> http://stackoverflow.com/questions/35197120/java-outofmemory
> error-on-sbt-assembly
>
> Just wondering if there any clean approach to include StanfordCoreNLP
> classes in spark ML ?
>
>
> On Mon, Sep 19, 2016 at 1:41 PM, Sujit Pal  wrote:
>
>> Hi Janardhan,
>>
>> You need the classifier "models" attribute on the second entry for
>> stanford-corenlp to indicate that you want the models JAR, as shown below.
>> Right now you are importing two instances of stanford-corenlp JARs.
>>
>> libraryDependencies ++= {
>>   val sparkVersion = "2.0.0"
>>   Seq(
>> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
>> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
>> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>> "com.google.protobuf" % "protobuf-java" % "2.6.1",
>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" classifier "models",
>> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>>   )
>> }
>>
>> -sujit
>>
>>
>> On Sun, Sep 18, 2016 at 5:12 PM, janardhan shetty > > wrote:
>>
>>> Hi Sujit,
>>>
>>> Tried that option but same error:
>>>
>>> java version "1.8.0_51"
>>>
>>>
>>> libraryDependencies ++= {
>>>   val sparkVersion = "2.0.0"
>>>   Seq(
>>> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>>> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
>>> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
>>> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
>>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>>> "com.google.protobuf" % "protobuf-java" % "2.6.1",
>>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>>> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>>>   )
>>> }
>>>
>>> Error:
>>>
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> edu/stanford/nlp/pipeline/StanfordCoreNLP
>>> at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.ap
>>> ply(Lemmatizer.scala:37)
>>> at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.ap
>>> ply(Lemmatizer.scala:33)
>>> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
>>> 2.apply(ScalaUDF.scala:88)
>>> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
>>> 2.apply(ScalaUDF.scala:87)
>>> at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(Scal
>>> aUDF.scala:1060)
>>> at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedEx
>>> pressions.scala:142)
>>> at org.apache.spark.sql.catalyst.expressions.InterpretedProject
>>> ion.apply(Projection.scala:45)
>>> at org.apache.spark.sql.catalyst.expressions.InterpretedProject
>>> ion.apply(Projection.scala:29)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(Traver
>>> sableLike.scala:234)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(Traver
>>> sableLike.scala:234)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.s
>>> cala:234)
>>>
>>>
>>>
>>> On Sun, Sep 18, 2016 at 2:21 PM, Sujit Pal 
>>> wrote:
>>>
 Hi Janardhan,

 Maybe try removing the string "test" from this line in your build.sbt?
 IIRC, this restricts the models JAR to be called from a test.

 "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test"
 classifier "models",

 -sujit


 On Sun, Sep 18, 2016 at 11:01 AM, janardhan shetty <
 janardhan...@gmail.com> wrote:

> Hi,
>
> I am trying to use lemmatization as a transformer and added belwo to
> the build.sbt
>
>  "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "com.google.protobuf" % "protobuf-java" % "2.6.1",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test"
> classifier "models",
> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>
>
> Error:
> *Exception in thread "main" java.lang.NoClassDefFoundError:
> edu/stanford/nlp/pipeline/StanfordCoreNLP*
>
> I have tried other versions of this spark package.
>
> Any help is appreciated..
>


>>>
>>
>


Re: ideas on de duplication for spark streaming?

2016-09-24 Thread Jörn Franke
As Cody said, Spark is not going to help you here. 
There are two issues you need to look at here: duplicated (or even more) 
messages processed by two different processes and the case of failure of any 
component (including the message broker). Keep in mind that duplicated messages 
can even occur weeks later (e.g. Something from experience: restart of message 
broker and message send weeks later again). 
As said, a Dht can help, but you will have a lot of (erroneous) effort to 
implement it.
You may want to look at (dedicated) redis nodes. Redis has support for 
partitioning, is very fast (but please create only one connection/ node and not 
per lookup) and provides you a lot of different data structures to solve your 
problem (e.g. Atomic counters). 

> On 24 Sep 2016, at 08:49, kant kodali  wrote:
> 
> 
> Hi Guys,
> 
> I have bunch of data coming in to my spark streaming cluster from a message 
> queue(not kafka). And this message queue guarantees at least once delivery 
> only so there is potential that some of the messages that come in to the 
> spark streaming cluster are actually duplicates and I am trying to figure out 
> a best way to filter them ? I was thinking if I should have a hashmap as a 
> broadcast variable but then I saw that broadcast variables are read only. 
> Also instead of having a global hashmap variable across every worker node I 
> am thinking Distributed hash table would be a better idea. any suggestions on 
> how best I could approach this problem by leveraging the existing 
> functionality?
> 
> Thanks,
> kant


Spark 1.6.2 Concurrent append to a HDFS folder with different partition key

2016-09-24 Thread Shing Hing Man

I am trying to prototype using a single instance SqlContext and use it toappend 
Dataframes,partition by a field, to the same HDFS folder from multiple threads. 
(Each thread will work with a DataFrame having different partition column 
value.)
I get the exception16/09/24 16:45:12 ERROR [ForkJoinPool-3-worker-13] 
InsertIntoHadoopFsRelation: Aborting job.java.io.FileNotFoundException: File 
hdfs://localhost:9000/user/temp/person/_temporary/0/task_201609241645_0001_m_00/country=UK-1
 does not exist. at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:644)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:92)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:702)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:698)
 at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:698)
 at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
 at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
 at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
 at 
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
 at 
org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
 at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:149)
 at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
 at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
 at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:106)
 at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
 at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
 at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

  Below is my code. 
object ConcurrentAppends {
 val outputDir = "hdfs://localhost:9000/user/temp/person"
 def main(args: Array[String]): Unit = { val sqlContext = { val config = new 
SparkConf().setAppName("test").setIfMissing("spark.master", "local[*]") val sc 
= new SparkContext(config) new SQLContext(sc) }
 val futureA = Future(badAppend(sqlContext)) val futureB = 
Future(badAppend(sqlContext))

 val result: Future[Long] = for { countA <- futureA countB <- futureB } yield { 
countA + countB }
 val timeout = 60 second val count = Await.result(result, timeout)
 println("Count=" + count) }

 /** * Appends some rows to folder person. */ def badAppend(sqlContext: 
SQLContext): Long = { println( s""" |sqlContext=${sqlContext.hashCode()} 
|thread=${Thread.currentThread().getName} |""".stripMargin)
 val personsDF: DataFrame = persons(sqlContext)
 personsDF.write.partitionBy("country").mode(SaveMode.Append).save(outputDir)
 personsDF.count
 }

 /** * @return A dataframe of rows */ def persons(sqlContext: SQLContext, 
rowsPerCountry: Int = 100): DataFrame = {
 val personSchema = StructType( Seq( StructField("name", StringType, nullable = 
true), StructField("age", IntegerType, nullable = true), StructField("gender", 
StringType, nullable = true), StructField("country", StringType, nullable = 
true) ) )
 val noOfCountry = 10
 val rows = for { countryIndex <- (0 until noOfCountry) recIndex <- (0 until 
rowsPerCountry)
 } yield (Row(s"foo-$recIndex", 10, "male", s"UK-$countryIndex"))
 val rdd = sqlContext.sparkContext.parallelize(rows) val personsDF = 
sqlContext.createDataFrame(rdd, personSchema)
 personsDF
 }
}}---  The above issue is mentioned in 
https://issues.apache.org/jira/browse/SPARK-10109which is still open. 
One way to have concurrent append is to use some sort of sharding - so that 
different thread writes to different folder and then each has its own temporary 
directory. 
It would be very much appreciated if someone would share a better solution.
Thanks in advance for any suggestions!
Shing


Re: ideas on de duplication for spark streaming?

2016-09-24 Thread Cody Koeninger
Spark alone isn't going to solve this problem, because you have no reliable
way of making sure a given worker has a consistent shard of the messages
seen so far, especially if there's an arbitrary amount of delay between
duplicate messages.  You need a DHT or something equivalent.

On Sep 24, 2016 1:49 AM, "kant kodali"  wrote:

> Hi Guys,
>
> I have bunch of data coming in to my spark streaming cluster from a
> message queue(not kafka). And this message queue guarantees at least once
> delivery only so there is potential that some of the messages that come in
> to the spark streaming cluster are actually duplicates and I am trying to
> figure out a best way to filter them ? I was thinking if I should have a
> hashmap as a broadcast variable but then I saw that broadcast variables are
> read only. Also instead of having a global hashmap variable across every
> worker node I am thinking Distributed hash table would be a better idea.
> any suggestions on how best I could approach this problem by leveraging the
> existing functionality?
>
> Thanks,
> kant
>


Re: Spark job fails as soon as it starts. Driver requested a total number of 168510 executor

2016-09-24 Thread Yash Sharma
We have too many (large)  files. We have about 30k partitions with about 4
years worth data and we need to process entire history in a one time
monolithic job.

I would like to know how spark decides the number of executors requested.
I've seen testcases where the max executors count is Integer's Max value,
 was wondering if we can compute an appropriate max executor count based on
the cluster resources.

Would be happy to contribute back if I can get some info on the executors
requests.

Cheers


On Sat, Sep 24, 2016, 6:39 PM ayan guha  wrote:

> Do you have too many small files you are trying to read? Number of
> executors are very high
> On 24 Sep 2016 10:28, "Yash Sharma"  wrote:
>
>> Have been playing around with configs to crack this. Adding them here
>> where it would be helpful to others :)
>> Number of executors and timeout seemed like the core issue.
>>
>> {code}
>> --driver-memory 4G \
>> --conf spark.dynamicAllocation.enabled=true \
>> --conf spark.dynamicAllocation.maxExecutors=500 \
>> --conf spark.core.connection.ack.wait.timeout=6000 \
>> --conf spark.akka.heartbeat.interval=6000 \
>> --conf spark.akka.frameSize=100 \
>> --conf spark.akka.timeout=6000 \
>> {code}
>>
>> Cheers !
>>
>> On Fri, Sep 23, 2016 at 7:50 PM, 
>> wrote:
>>
>>> For testing purpose can you run with fix number of executors and try.
>>> May be 12 executors for testing and let know the status.
>>>
>>> Get Outlook for Android 
>>>
>>>
>>>
>>> On Fri, Sep 23, 2016 at 3:13 PM +0530, "Yash Sharma" 
>>> wrote:
>>>
>>> Thanks Aditya, appreciate the help.

 I had the exact thought about the huge number of executors requested.
 I am going with the dynamic executors and not specifying the number of
 executors. Are you suggesting that I should limit the number of executors
 when the dynamic allocator requests for more number of executors.

 Its a 12 node EMR cluster and has more than a Tb of memory.



 On Fri, Sep 23, 2016 at 5:12 PM, Aditya <
 aditya.calangut...@augmentiq.co.in> wrote:

> Hi Yash,
>
> What is your total cluster memory and number of cores?
> Problem might be with the number of executors you are allocating. The
> logs shows it as 168510 which is on very high side. Try reducing your
> executors.
>
>
> On Friday 23 September 2016 12:34 PM, Yash Sharma wrote:
>
>> Hi All,
>> I have a spark job which runs over a huge bulk of data with Dynamic
>> allocation enabled.
>> The job takes some 15 minutes to start up and fails as soon as it
>> starts*.
>>
>> Is there anything I can check to debug this problem. There is not a
>> lot of information in logs for the exact cause but here is some snapshot
>> below.
>>
>> Thanks All.
>>
>> * - by starts I mean when it shows something on the spark web ui,
>> before that its just blank page.
>>
>> Logs here -
>>
>> {code}
>> 16/09/23 06:33:19 INFO ApplicationMaster: Started progress reporter
>> thread with (heartbeat : 3000, initial allocation : 200) intervals
>> 16/09/23 06:33:27 INFO YarnAllocator: Driver requested a total number
>> of 168510 executor(s).
>> 16/09/23 06:33:27 INFO YarnAllocator: Will request 168510 executor
>> containers, each with 2 cores and 6758 MB memory including 614 MB 
>> overhead
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 22
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 19
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 18
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 12
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 11
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 20
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 15
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 7
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 8
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 16
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 21
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 6
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason
>> for non-existent executor 13
>> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason

Re: databricks spark-csv: linking coordinates are what?

2016-09-24 Thread Anastasios Zouzias
Hi Dan,

If you use spark <= 1.6, you can also do

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

to quickly link the spark-csv jars to spark shell. Otherwise as Holden
suggested you link it in your maven/sbt dependencies. Spark guys assume
that their users have a good working knowledge on maven/sbt; you might need
to read on these before jumping to Spark.

Best,
Anastasios

On Fri, Sep 23, 2016 at 10:26 PM, Dan Bikle  wrote:

>

> >

> hello world-of-spark,
>
> I am learning spark today.
>
> I want to understand the spark code in this repo:
>
> https://github.com/databricks/spark-csv

>
> In the README.md I see this info:
>
> Linking
>
> You can link against this library in your program at the following
coordinates:
> Scala 2.10
>
> groupId: com.databricks
> artifactId: spark-csv_2.10
> version: 1.5.0
>
> Scala 2.11
>
> groupId: com.databricks
> artifactId: spark-csv_2.11
> version: 1.5.0
>
> I want to know how I can use the above info.
>
> The people who wrote spark-csv should give some kind of example, demo, or
context.
>
> My understanding of Linking is limited.
>
> I have some experience operating sbt which I learned from this URL:
>
>
http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications

>
> The above URL does not give me enough information so that I can link
spark-csv with spark.
>
> Question:
> How do I learn how to use the info in the Linking section of the
README.md of
> https://github.com/databricks/spark-csv

> ??
>

-- 
-- Anastasios Zouzias


Re: Spark job fails as soon as it starts. Driver requested a total number of 168510 executor

2016-09-24 Thread ayan guha
Do you have too many small files you are trying to read? Number of
executors are very high
On 24 Sep 2016 10:28, "Yash Sharma"  wrote:

> Have been playing around with configs to crack this. Adding them here
> where it would be helpful to others :)
> Number of executors and timeout seemed like the core issue.
>
> {code}
> --driver-memory 4G \
> --conf spark.dynamicAllocation.enabled=true \
> --conf spark.dynamicAllocation.maxExecutors=500 \
> --conf spark.core.connection.ack.wait.timeout=6000 \
> --conf spark.akka.heartbeat.interval=6000 \
> --conf spark.akka.frameSize=100 \
> --conf spark.akka.timeout=6000 \
> {code}
>
> Cheers !
>
> On Fri, Sep 23, 2016 at 7:50 PM, 
> wrote:
>
>> For testing purpose can you run with fix number of executors and try. May
>> be 12 executors for testing and let know the status.
>>
>> Get Outlook for Android 
>>
>>
>>
>> On Fri, Sep 23, 2016 at 3:13 PM +0530, "Yash Sharma" 
>> wrote:
>>
>> Thanks Aditya, appreciate the help.
>>>
>>> I had the exact thought about the huge number of executors requested.
>>> I am going with the dynamic executors and not specifying the number of
>>> executors. Are you suggesting that I should limit the number of executors
>>> when the dynamic allocator requests for more number of executors.
>>>
>>> Its a 12 node EMR cluster and has more than a Tb of memory.
>>>
>>>
>>>
>>> On Fri, Sep 23, 2016 at 5:12 PM, Aditya >> co.in> wrote:
>>>
 Hi Yash,

 What is your total cluster memory and number of cores?
 Problem might be with the number of executors you are allocating. The
 logs shows it as 168510 which is on very high side. Try reducing your
 executors.


 On Friday 23 September 2016 12:34 PM, Yash Sharma wrote:

> Hi All,
> I have a spark job which runs over a huge bulk of data with Dynamic
> allocation enabled.
> The job takes some 15 minutes to start up and fails as soon as it
> starts*.
>
> Is there anything I can check to debug this problem. There is not a
> lot of information in logs for the exact cause but here is some snapshot
> below.
>
> Thanks All.
>
> * - by starts I mean when it shows something on the spark web ui,
> before that its just blank page.
>
> Logs here -
>
> {code}
> 16/09/23 06:33:19 INFO ApplicationMaster: Started progress reporter
> thread with (heartbeat : 3000, initial allocation : 200) intervals
> 16/09/23 06:33:27 INFO YarnAllocator: Driver requested a total number
> of 168510 executor(s).
> 16/09/23 06:33:27 INFO YarnAllocator: Will request 168510 executor
> containers, each with 2 cores and 6758 MB memory including 614 MB overhead
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 22
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 19
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 18
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 12
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 11
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 20
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 15
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 7
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 8
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 16
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 21
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 6
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 13
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 14
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 9
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 3
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 17
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 1
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 10
> 16/09/23 06:33:36 WARN YarnAllocator: Tried to get the loss reason for
> non-existent executor 4
> 16/09/23 

Re: Spark MLlib ALS algorithm

2016-09-24 Thread Nick Pentreath
The scale factor was only to scale up the number of ratings in the dataset
for performance testing purposes, to illustrate the scalability of Spark
ALS.

It is not something you would normally do on your training dataset.
On Fri, 23 Sep 2016 at 20:07, Roshani Nagmote 
wrote:

> Hello,
>
> I was working on Spark MLlib ALS Matrix factorization algorithm and came
> across the following blog post:
>
>
> https://databricks.com/blog/2014/07/23/scalable-collaborative-filtering-with-spark-mllib.html
>
> Can anyone help me understanding what "s" scaling factor does and does it
> really give better performance? What's the significance of this?
> If we convert input data to scaledData with the help of "s", will it
> speedup the algorithm?
>
> Scaled data usage:
> *(For each user, we create pseudo-users that have the same ratings. That
> is, for every rating as (userId, productId, rating), we generate (userId+i,
> productId, rating) where 0 <= i < s and s is the scaling factor)*
>
> Also, this blogpost is for spark 1.1 and I am currently using 2.0
>
> Any help will be greatly appreciated.
>
> Thanks,
> Roshani
>


ideas on de duplication for spark streaming?

2016-09-24 Thread kant kodali

Hi Guys,
I have bunch of data coming in to my spark streaming cluster from a message
queue(not kafka). And this message queue guarantees at least once delivery only
so there is potential that some of the messages that come in to the spark
streaming cluster are actually duplicates and I am trying to figure out a best
way to filter them ? I was thinking if I should have a hashmap as a broadcast
variable but then I saw that broadcast variables are read only. Also instead of
having a global hashmap variable across every worker node I am thinking
Distributed hash table would be a better idea. any suggestions on how best I
could approach this problem by leveraging the existing functionality?
Thanks,kant