Re: Spark Function setup and cleanup

2014-07-26 Thread Yosi Botzer
Thank you, but that doesn't answer my general question.

I might need to enrich my records using different datasources (or DB's)

So the general use case I need to support is to have some kind of Function
that has init() logic for creating connection to DB, query the DB for each
records and enrich my input record with stuff from the DB, and use some
kind of close() logic to close the connection.

I have implemented this kind of use case using Map/Reduce and I want to
know how can I do it with spark

Thanks


On Fri, Jul 25, 2014 at 6:24 AM, Yanbo Liang yanboha...@gmail.com wrote:

 You can refer this topic
 http://www.mapr.com/developercentral/code/loading-hbase-tables-spark


 2014-07-24 22:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com:

 In my case I want to reach HBase. For every record with userId I want to
 get some extra information about the user and add it to result record for
 further prcessing


 On Thu, Jul 24, 2014 at 9:11 AM, Yanbo Liang yanboha...@gmail.com
 wrote:

 If you want to connect to DB in program, you can use JdbcRDD (
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
 )


 2014-07-24 18:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com:

 Hi,

 I am using the Java api of Spark.

 I wanted to know if there is a way to run some code in a manner that is
 like the setup() and cleanup() methods of Hadoop Map/Reduce

 The reason I need it is because I want to read something from the DB
 according to each record I scan in my Function, and I would like to open
 the DB connection only once (and close it only once).

 Thanks







Re: Spark Function setup and cleanup

2014-07-26 Thread Sean Owen
Look at mapPartitions. Where as map turns one value V1 into one value
V2, mapPartitions lets you turn one entire Iterator[V1] to one whole
Iterator [V2]. The function that does so can perform some
initialization at its start, and then process all of the values, and
clean up at its end. This is how you mimic a Mapper, really.

The most literal translation of Hadoop MapReduce I can think of is:

Mapper: mapPartitions to turn many (K1,V1) into many (K2,V2)
(shuffle) groupByKey to turn that into (K2,Iterator[V2])
Reducer mapPartitions to turn many (K2,Iterator[V2]) into many (K3,V3)

It's not necessarily optimal to do it this way -- especially the
groupByKey bit. You have more expressive power here and need not fit
it into this paradigm. But yes you can get the same effect as in
MapReduce, mostly from mapPartitions.

On Sat, Jul 26, 2014 at 8:52 AM, Yosi Botzer yosi.bot...@gmail.com wrote:
 Thank you, but that doesn't answer my general question.

 I might need to enrich my records using different datasources (or DB's)

 So the general use case I need to support is to have some kind of Function
 that has init() logic for creating connection to DB, query the DB for each
 records and enrich my input record with stuff from the DB, and use some kind
 of close() logic to close the connection.

 I have implemented this kind of use case using Map/Reduce and I want to know
 how can I do it with spark


Re: Emacs Setup Anyone?

2014-07-26 Thread Prashant Sharma
Normally any setup that has inferior mode for scala repl will also support
spark repl (with little or no modifications).

 Apart from that I personally use spark repl normally by invoking
spark-shell in a shell in emacs, and I keep the scala tags(etags) for the
spark loaded. With this setup it is kinda fast to do either tag prediction
at point which is not accurate etc.. but its useful.

Incase you are working on building this(inferior mode for spark repl) for
us, I can come up with a wishlist.



Prashant Sharma


On Sat, Jul 26, 2014 at 3:07 AM, Andrei faithlessfri...@gmail.com wrote:

 I have never tried Spark REPL from within Emacs, but I remember that
 switching from normal Python to Pyspark was as simple as changing
 interpreter name at the beginning of session. Seems like ensime [1]
 (together with ensime-emacs [2]) should be a good point to start. For
 example, take a look at ensime-sbt.el [3] that defines a number of
 Scala/SBT commands.

 [1]: https://github.com/ensime/ensime-server
 [2]: https://github.com/ensime/ensime-emacs
 [3]: https://github.com/ensime/ensime-emacs/blob/master/ensime-sbt.el




 On Thu, Jul 24, 2014 at 10:14 PM, Steve Nunez snu...@hortonworks.com
 wrote:

 Anyone out there have a good configuration for emacs? Scala-mode sort of
 works, but I’d love to see a fully-supported spark-mode with an inferior
 shell. Searching didn’t turn up much of anything.

 Any emacs users out there? What setup are you using?

 Cheers,
 - SteveN



 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity
 to which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.





How can I integrate spark cluster into my own program without using spark-submit?

2014-07-26 Thread Lizhengbing (bing, BIPA)
I want to use spark cluster through a scala function. So I can integrate spark 
into my program directly.
For example:
  When I call count function in my own program, my program will deploy the 
function to the cluster , so I can get the result directly
  def count()=
{
val master = spark://mache123:7077
  val appName = control_test
   val sc = new SparkContext(master, appName)
   val rdd =  sc.textFile(hdfs://123d101suse11sp3:9000/netflix/netflix.test)
val count = rdd.count
   System.out.println(rdd.count =  + count)
count

}


Fwd: Exception : org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table

2014-07-26 Thread Bilna Govind


 Hi,

 This is my first code in shark 0.9.1. I am new to spark and shark. So I 
 don't know where I went wrong. It will be really helpful, If some one out 
 there can troubleshoot the problem.
 First of all I will give a glimpse on my code which is developed in 
 IntellijIdea. This code is running perfectly in the editor
  
 *Code:*

 def main(args: Array[String]){
 val sparkConf = new 
 SparkConf().setAppName(SharkTest).setMaster(local)
   .set(spark.executor.memory, 8g)
   .set(spark.worker.memory, 8g)
   .set(spark.executor.uri, http://IP/spark/spark-0.9.1.tar.gz;)
   .set(spark.mesos.coarse, true)
   
 .setJars(List(args(1)+/shark-assembly-0.9.1-hadoop2.0.0-cdh4.5.0.jar))
 val shc = SharkEnv.initWithSharkContext(sparkConf)
 val q1=CREATE EXTERNAL TABLE IF NOT EXISTS table1(startIpNum 
 string,endIpNum string,locId string) ROW FORMAT DELIMITED FIELDS TERMINATED 
 BY '+args(3)+' LOCATION '+args(2)+' 
 val q3=SELECT * FROM table1
 shc.runSql(q1)
 shc.runSql(q3)
 shc.sql2rdd(q3).map{resultSet=
   val 
 y=resultSet.colname2indexMap.values.map(index=resultSet(index)).reduce((a,b)=a+,+b)
 y
 }.saveAsTextFile(args(4))
 shc.sql(DROP TABLE IF EXISTS table1)
   }

 *build.sbt:*


 import AssemblyKeys._

 assemblySettings

 name := appname

 version := 1.0

 scalaVersion := 2.10.3

 mainClass := Some(classname)

 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 0.9.1,
   edu.berkeley.cs.shark %% shark % 0.9.1,
   org.apache.hive % hive-anttasks % 0.11.0,
   org.apache.hive % hive-beeline % 0.11.0,
   org.apache.hive % hive-cli % 0.11.0,
   org.apache.hive % hive-common % 0.11.0,
   org.apache.hive % hive-exec % 0.11.0,
   org.apache.hive % hive-hbase-handler % 0.11.0,
   org.apache.hive % hive-hwi % 0.11.0,
   org.apache.hive % hive-jdbc % 0.11.0,
   org.apache.hive % hive-metastore % 0.11.0,
   org.apache.hive % hive-serde % 0.11.0,
   org.apache.hive % hive-service % 0.11.0,
   org.apache.hive % hive-shims % 0.11.0,
   org.datanucleus % datanucleus-core % 3.2.2,
   org.datanucleus % datanucleus-rdbms % 3.2.1,
   org.datanucleus % datanucleus-api-jdo % 3.2.1,
   org.datanucleus % datanucleus-enhancer % 3.1.1,
   org.apache.derby % derby % 10.10.1.1,
   org.apache.hadoop % hadoop-client % 2.0.0-cdh4.5.0)

 resolvers ++= Seq(Akka Repository at http://repo.akka.io/releases/;,
   Cloudera Repository at 
 https://repository.cloudera.com/artifactory/cloudera-repos/;)

 mergeStrategy in assembly := {
   case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard
   case m if m.toLowerCase.matches(meta-inf.*\\.sf$) = 
 MergeStrategy.discard
   case log4j.properties = MergeStrategy.discard
   case m if m.toLowerCase.startsWith(meta-inf/services/) = 
 MergeStrategy.filterDistinctLines
   case reference.conf = MergeStrategy.concat
   case _ = MergeStrategy.first
 }

 sbt assembly plugin version : 0.10.2

 The problem is only when I am trying create the jar of the code. 

 Steps followed to create the jar:
 1. Sbt clean
 2. Sbt assembly

 When I try to run the jar using the command java -jar jarName.jar 
 parameters , an error comes as invalid or corrupt jar
 The same jar is accepted when executed as java -cp 
 jarname.jarclassnameparameters. But in this case a hive exception 
 occurs as unable to fetch the table tablename 


 14/07/26 12:21:39 INFO Driver: PERFLOG method=TimeToSubmit
 14/07/26 12:21:39 INFO Driver: PERFLOG method=compile
 14/07/26 12:21:39 INFO ParseDriver: Parsing command: CREATE EXTERNAL TABLE 
 IF NOT EXISTS table1(startIpNum string,endIpNum string,locId string) ROW 
 FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 
 '/home/user/foldername/Input/SharkTest'
 14/07/26 12:21:39 INFO ParseDriver: Parse Completed
 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Starting Semantic Analysis
 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Creating table table1 
 position=36
 14/07/26 12:21:40 INFO HiveMetaStore: 0: Opening raw store with 
 implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
 14/07/26 12:21:40 INFO ObjectStore: ObjectStore, initialize called
 org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
 table1
 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:957)
 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:904)
 at 
 org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9328)
 at 
 org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8647)
 at 
 shark.parse.SharkSemanticAnalyzer.analyzeInternal(SharkSemanticAnalyzer.scala:105)
 at 
 org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:279)
 at shark.SharkDriver.compile(SharkDriver.scala:215)

 I would appreciate any comments about the cause of the above exception

 Regards,

 Bilna P



Exception : org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table

2014-07-26 Thread Bilna Govind


 Hi,

 This is my first code in shark 0.9.1. I am new to spark and shark. So I 
 don't know where I went wrong. It will be really helpful, If some one out 
 there can troubleshoot the problem.
 First of all I will give a glimpse on my code which is developed in 
 IntellijIdea. This code is running perfectly in the editor
  
 *Code:*

 def main(args: Array[String]){
 val sparkConf = new 
 SparkConf().setAppName(SharkTest).setMaster(local)
   .set(spark.executor.memory, 8g)
   .set(spark.worker.memory, 8g)
   .set(spark.executor.uri, http://IP/spark/spark-0.9.1.tar.gz;)
   .set(spark.mesos.coarse, true)
   
 .setJars(List(args(1)+/shark-assembly-0.9.1-hadoop2.0.0-cdh4.5.0.jar))
 val shc = SharkEnv.initWithSharkContext(sparkConf)
 val q1=CREATE EXTERNAL TABLE IF NOT EXISTS table1(startIpNum 
 string,endIpNum string,locId string) ROW FORMAT DELIMITED FIELDS TERMINATED 
 BY '+args(3)+' LOCATION '+args(2)+' 
 val q3=SELECT * FROM table1
 shc.runSql(q1)
 shc.runSql(q3)
 shc.sql2rdd(q3).map{resultSet=
   val 
 y=resultSet.colname2indexMap.values.map(index=resultSet(index)).reduce((a,b)=a+,+b)
 y
 }.saveAsTextFile(args(4))
 shc.sql(DROP TABLE IF EXISTS table1)
   }

 *build.sbt:*


 import AssemblyKeys._

 assemblySettings

 name := appname

 version := 1.0

 scalaVersion := 2.10.3

 mainClass := Some(classname)

 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 0.9.1,
   edu.berkeley.cs.shark %% shark % 0.9.1,
   org.apache.hive % hive-anttasks % 0.11.0,
   org.apache.hive % hive-beeline % 0.11.0,
   org.apache.hive % hive-cli % 0.11.0,
   org.apache.hive % hive-common % 0.11.0,
   org.apache.hive % hive-exec % 0.11.0,
   org.apache.hive % hive-hbase-handler % 0.11.0,
   org.apache.hive % hive-hwi % 0.11.0,
   org.apache.hive % hive-jdbc % 0.11.0,
   org.apache.hive % hive-metastore % 0.11.0,
   org.apache.hive % hive-serde % 0.11.0,
   org.apache.hive % hive-service % 0.11.0,
   org.apache.hive % hive-shims % 0.11.0,
   org.datanucleus % datanucleus-core % 3.2.2,
   org.datanucleus % datanucleus-rdbms % 3.2.1,
   org.datanucleus % datanucleus-api-jdo % 3.2.1,
   org.datanucleus % datanucleus-enhancer % 3.1.1,
   org.apache.derby % derby % 10.10.1.1,
   org.apache.hadoop % hadoop-client % 2.0.0-cdh4.5.0)

 resolvers ++= Seq(Akka Repository at http://repo.akka.io/releases/;,
   Cloudera Repository at 
 https://repository.cloudera.com/artifactory/cloudera-repos/;)

 mergeStrategy in assembly := {
   case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard
   case m if m.toLowerCase.matches(meta-inf.*\\.sf$) = 
 MergeStrategy.discard
   case log4j.properties = MergeStrategy.discard
   case m if m.toLowerCase.startsWith(meta-inf/services/) = 
 MergeStrategy.filterDistinctLines
   case reference.conf = MergeStrategy.concat
   case _ = MergeStrategy.first
 }

 sbt assembly plugin version : 0.10.2

 The problem is only when I am trying create the jar of the code. 

 Steps followed to create the jar:
 1. Sbt clean
 2. Sbt assembly

 When I try to run the jar using the command java -jar jarName.jar 
 parameters , an error comes as invalid or corrupt jar
 The same jar is accepted when executed as java -cp 
 jarname.jarclassnameparameters. But in this case a hive exception 
 occurs as unable to fetch the table tablename 


 14/07/26 12:21:39 INFO Driver: PERFLOG method=TimeToSubmit
 14/07/26 12:21:39 INFO Driver: PERFLOG method=compile
 14/07/26 12:21:39 INFO ParseDriver: Parsing command: CREATE EXTERNAL TABLE 
 IF NOT EXISTS table1(startIpNum string,endIpNum string,locId string) ROW 
 FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 
 '/home/user/foldername/Input/SharkTest'
 14/07/26 12:21:39 INFO ParseDriver: Parse Completed
 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Starting Semantic Analysis
 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Creating table table1 
 position=36
 14/07/26 12:21:40 INFO HiveMetaStore: 0: Opening raw store with 
 implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
 14/07/26 12:21:40 INFO ObjectStore: ObjectStore, initialize called
 org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
 table1
 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:957)
 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:904)
 at 
 org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9328)
 at 
 org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8647)
 at 
 shark.parse.SharkSemanticAnalyzer.analyzeInternal(SharkSemanticAnalyzer.scala:105)
 at 
 org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:279)
 at shark.SharkDriver.compile(SharkDriver.scala:215)

 I would appreciate any comments about the cause of the above exception

 Regards,

 Bilna P



Help using streaming from Spark Shell

2014-07-26 Thread Yana Kadiyska
Hi,

I'm starting spark-shell like this:

SPARK_MEM=1g SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=3600
/spark/bin/spark-shell -c 3

but when I try to create a streaming context
val scc = new StreamingContext(sc, Seconds(10))

 I get:

org.apache.spark.SparkException: Spark Streaming cannot be used
without setting spark.cleaner.ttl; set this property before creating a
SparkContext creating a SparkContext (use SPARK_JAVA_OPTS for the
shell)

at 
org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:121)


I also tried export SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=3600
before calling spark-shell but with no luck...

What am I doing wrong? This is spark 0.9.1 -- I cannot upgrade


Lot of object serialization even with MEMORY_ONLY

2014-07-26 Thread lokesh.gidra
Hello,

I am executing the SparkPageRank example. It uses the cache() API for
persistence of RDDs. And if I am not wrong, it in turn uses MEMORY_ONLY
storage level. However, in oprofile report it shows a lot of activity in
writeObject0 function.

There is not even a single Spilling in-memory... message in the
output/log. This is because I am using a huge heap size of 120GB.

Can someone please tell me why do I see so much serialization happening,
even though MEMORY_ONLY storage level is used? The spark version that I am
using is 1.0.1


Thanks,
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Lot-of-object-serialization-even-with-MEMORY-ONLY-tp10722.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spilling in-memory... messages in log even with MEMORY_ONLY

2014-07-26 Thread lokesh.gidra
Hello,

I am running SparkPageRank example which uses cache() API for persistence.
This AFAIK, uses MEMORY_ONLY storage level. But even in this setup, I see a
lot of WARN ExternalAppendOnlyMap: Spilling in-memory map of messages
in the log. Why is it so? I thought that MEMORY_ONLY means kick out the RDD
if there isn't enough memory available.


Thanks,
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


graphx cached partitions wont go away

2014-07-26 Thread Koert Kuipers
i have graphx queries running inside a service where i collect the results
to the driver and do not hold any references to the rdds involved in the
queries. my assumption was that with the references gone spark would go and
remove the cached rdds from memory (note, i did not cache them, graphx did).

yet they hang around...

is my understanding of how the ContextCleaner works incorrect? or could it
be that grapx holds some references internally to rdds, preventing garbage
collection? maybe even circular references?


Re: Spilling in-memory... messages in log even with MEMORY_ONLY

2014-07-26 Thread Matei Zaharia
These messages are actually not about spilling the RDD, they're about spilling 
intermediate state in a reduceByKey, groupBy or other operation whose state 
doesn't fit in memory. We have to do that in these cases to avoid going out of 
memory. You can minimize spilling by having more reduce tasks though, which 
will mean less data per task.

Matei

On Jul 26, 2014, at 1:22 PM, lokesh.gidra lokesh.gi...@gmail.com wrote:

 Hello,
 
 I am running SparkPageRank example which uses cache() API for persistence.
 This AFAIK, uses MEMORY_ONLY storage level. But even in this setup, I see a
 lot of WARN ExternalAppendOnlyMap: Spilling in-memory map of messages
 in the log. Why is it so? I thought that MEMORY_ONLY means kick out the RDD
 if there isn't enough memory available.
 
 
 Thanks,
 Lokesh
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spilling in-memory... messages in log even with MEMORY_ONLY

2014-07-26 Thread lokesh.gidra
Thanks for the reply. I understand this now.

But in another situation, when I use large heap size to avoid any spilling
(I confirm, there are no spilling messages in log), I still see a lot of
time being spent in writeObject0() function. Can you please tell me why
would there be any serialization done?


Thanks
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10727.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spilling in-memory... messages in log even with MEMORY_ONLY

2014-07-26 Thread Matei Zaharia
Even in local mode, Spark serializes data that would be sent across the 
network, e.g. in a reduce operation, so that you can catch errors that would 
happen in distributed mode. You can make serialization much faster by using the 
Kryo serializer; see http://spark.apache.org/docs/latest/tuning.html. But it 
won't go away. Basically the code is not optimized for the very best 
performance on a single node, it's designed to make it easy to build your 
program locally and run it on a cluster without surprises.

Matei

On Jul 26, 2014, at 3:08 PM, lokesh.gidra lokesh.gi...@gmail.com wrote:

 Thanks for the reply. I understand this now.
 
 But in another situation, when I use large heap size to avoid any spilling
 (I confirm, there are no spilling messages in log), I still see a lot of
 time being spent in writeObject0() function. Can you please tell me why
 would there be any serialization done?
 
 
 Thanks
 Lokesh
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10727.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: streaming window not behaving as advertised (v1.0.1)

2014-07-26 Thread Tathagata Das
Yeah, maybe I should bump the issue to major. Now that I thought about
to give my previous answer, this should be easy to fix just by doing a
foreachRDD on all the input streams within the system (rather than
explicitly doing it like I asked you to do).

Thanks Alan, for testing this out and confirming that this was the
same issue. I was worried that this is a totally new issue that we did
not know of.

TD

On Wed, Jul 23, 2014 at 12:37 AM, Alan Ngai a...@opsclarity.com wrote:
 TD, it looks like your instincts were correct.  I misunderstood what you
 meant.  If I force an eval on the inputstream using foreachRDD, the
 windowing will work correctly.  If I don’t do that, lazy eval somehow screws
 with window batches I eventually receive.  Any reason the bug is categorized
 as minor?  It seems that anyone who uses the windowing functionality would
 run into this bug.  I imagine this would include anyone who wants to use
 spark streaming to aggregate data in fixed time batches, which seems like a
 fairly common use case.

 Alan



 On Jul 22, 2014, at 11:30 PM, Alan Ngai a...@opsclarity.com wrote:

 foreachRDD is how I extracted values in the first place, so that’s not going
 to make a difference.  I don’t think it’s related to SPARK-1312 because I’m
 generating data every second in the first place and I’m using foreachRDD
 right after the window operation.  The code looks something like

 val batchInterval = 5
 val windowInterval = 25
 val slideInterval = 15

 val windowedStream = inputStream.window(Seconds(windowInterval),
 Seconds(slideInterval))

 val outputFunc = (r: RDD[MetricEvent], t: Time) = {
   println(
 %s.format(t.milliseconds / 1000))
   r.foreach{metric =
 val timeKey = metric.timeStamp / batchInterval * batchInterval
 println(%s %s %s %s.format(timeKey, metric.timeStamp, metric.name,
 metric.value))
   }
 }
 testWindow.foreachRDD(outputFunc)

 On Jul 22, 2014, at 10:13 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 It could be related to this bug that is currently open.
 https://issues.apache.org/jira/browse/SPARK-1312

 Here is a workaround. Can you put a inputStream.foreachRDD(rdd = { }) and
 try these combos again?

 TD


 On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai a...@opsclarity.com wrote:

 I have a sample application pumping out records 1 per second.  The batch
 interval is set to 5 seconds.  Here’s a list of “observed window intervals”
 vs what was actually set

 window=25, slide=25 : observed-window=25, overlapped-batches=0
 window=25, slide=20 : observed-window=20, overlapped-batches=0
 window=25, slide=15 : observed-window=15, overlapped-batches=0
 window=25, slide=10 : observed-window=20, overlapped-batches=2
 window=25, slide=5 : observed-window=25, overlapped-batches=3

 can someone explain this behavior to me?  I’m trying to aggregate metrics
 by time batches, but want to skip partial batches.  Therefore, I’m trying to
 find a combination which results in 1 overlapped batch, but no combination I
 tried gets me there.

 Alan






Re: SparkContext startup time out

2014-07-26 Thread Anand Avati
I am bumping into this problem as well. I am trying to move to akka 2.3.x
from 2.2.x in order to port to Scala 2.11 - only akka 2.3.x is available in
Scala 2.11. All 2.2.x akka works fine, and all 2.3.x akka give the
following exception in new SparkContext. Still investigating why..

  java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
  at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
  at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.result(package.scala:107)
  at akka.remote.Remoting.start(Remoting.scala:180)
  at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
  at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
  at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
  at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)




On Fri, May 30, 2014 at 6:33 AM, Pierre B 
pierre.borckm...@realimpactanalytics.com wrote:

 I was annoyed by this as well.
 It appears that just permuting the order of decencies inclusion solves this
 problem:

 first spark, than your cdh hadoop distro.

 HTH,

 Pierre



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-startup-time-out-tp1753p6582.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: java.lang.StackOverflowError when calling count()

2014-07-26 Thread Tathagata Das
Responses inline.

On Wed, Jul 23, 2014 at 4:13 AM, lalit1303 la...@sigmoidanalytics.com wrote:
 Hi,
 Thanks TD for your reply. I am still not able to resolve the problem for my
 use case.
 I have let's say 1000 different RDD's, and I am applying a transformation
 function on each RDD and I want the output of all rdd's combined to a single
 output RDD. For, this I am doing the following:

 *Loop Start*
 tempRDD = jaRDD.rdd().repartition(1).mapPartitions().toJavaRDD();
 *//creating new rdd in every loop*
 outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into
 a single RDD*

 *//after every 10 iteration, in order to truncate the lineage*
 cachRDD = outRDD.cache();
 cachRDD.checkpoint();
 System.out.println(cachRDD.collect().size());
 outRDD = new JavaRDDString(cachRDD.rdd(),cachRDD.classTag());
 *Loop Ends*

 *//finally after whole computation*
 outRDD.saveAsTextFile(..)

 The above operations is overall slow, runs successfully when performed less
 iterations i.e. ~100. But, when the num of iterations in increased to ~1000,
 The whole job is taking more than *30 mins* and ultimately break down giving
 OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am
 running the job on spark standalone mode with 2 cores and 2.9 GB memory.

I think this is happening because how you are caching the output RDD
that are being generated repeatedly. In every iteration, it is
building this new union RDD which contains the data of the previous
union RDD plus some new data. Since each of these union RDDs are
cached, the underlying data is being cached repeatedly. So the cached
Iteration 1: union RDD: X MB
Iteration 2: union RDD: 2X MB   |  Total size cached: 3X
Iteration 3: union RDD: 3X MB   |  Total size cached: 6X MB
Iteration 4: union RDD: 4X MB   |  Total size cached: 10X MB
...

If you do the math, that is a quadratic increase in the size of the
data being processed and cached, wrt the # iterations. This leads to
both increase in run time and memory usage.


 I also observed that when collect() operation is performed, the number of
 tasks keeps on increasing as the loop proceeds, like on first collect() 22
 total task, then ~40 total tasks ... ~300 task for single collect.
 Does this means that all the operations are repeatedly performed, and RDD
 lineage is not broken??

Same reason as above. Each union RDD is build by appending the
partitions of the previous union RDD plus the new set of partitions
(~22 partitions). So Nth union RDD has N * 22 partitions, hence that
many tasks.
You could change this by also doing repartitioning when you want to
cache+checkpoint the union RDD (therefore,
outRDD.repartition(100).cache().checkpoint().count()).

And do you really need all the data to be collected at the driver? If
you are doing the cachRDD.collect() just to forced the checkpoint,
then use cachRDD.count()


 Can you please elaborate on the point from your last post i.e. how to
 perform: *Create a modified RDD R` which has the same data as RDD R but
 does not have the lineage. This is done by creating a new BlockRDD using the
 ids of blocks of data representing the in-memory R*

Please refer to the lines in the function:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala#L74
What those lines do is save the data of the associated RDD to HDFS
files, and then create a new CheckpointRDD from the same files.Then
the dependency of the associated RDD is changed to use the new RDD.
This truncates the lineage because the associated RDD's parent is not
the new RDD which has a very short lineage (links to checkpoint
files). And the previous dependencies (parent RDDs) are forgotten.

This implementation can be modified by forcing the data of the
associated RDD to be cached with StorageLevel.MEMORY_AND_DISK_2. And
then instead of CheckpointRDD, you can create a new BlockRDD (using
the names of the blocks that are used to cache the RDD), which is then
set as the new dependency. This is definitely a behind-the-public API
implementation, that is




 -
 Lalit Yadav
 la...@sigmoidanalytics.com
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark MLlib vs BIDMach Benchmark

2014-07-26 Thread DB Tsai
BIDMach is CPU and GPU-accelerated Machine Learning Library also from Berkeley.

https://github.com/BIDData/BIDMach/wiki/Benchmarks

They did benchmark against Spark 0.9, and they claimed that it's
significantly faster than Spark MLlib. In Spark 1.0, lot of
performance optimization had been done, and sparse data is supported.
It will be interesting to see new benchmark result.

Anyone familiar with BIDMach? Are they as fast as they claim?

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai