RE: Question about RDD cache, unpersist, materialization

2014-06-11 Thread Nick Pentreath
If you want to force materialization use .count()


Also if you can simply don't unpersist anything, unless you really need to free 
the memory 
—
Sent from Mailbox

On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim
taeyun@innowireless.co.kr wrote:

 BTW, it is possible that rdd.first() does not compute the whole partitions.
 So, first() cannot be uses for the situation below.
 -Original Message-
 From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
 Sent: Wednesday, June 11, 2014 11:40 AM
 To: user@spark.apache.org
 Subject: Question about RDD cache, unpersist, materialization
 Hi,
 What I (seems to) know about RDD persisting API is as follows:
 - cache() and persist() is not an action. It only does a marking.
 - unpersist() is also not an action. It only removes a marking. But if the
 rdd is already in memory, it is unloaded.
 And there seems no API to forcefully materialize the RDD without requiring a
 data by an action method, for example first().
 So, I am faced with the following scenario.
 {
 JavaRDDT rddUnion = sc.parallelize(new ArrayListT());  // create
 empty for merging
 for (int i = 0; i  10; i++)
 {
 JavaRDDT2 rdd = sc.textFile(inputFileNames[i]);
 rdd.cache();  // Since it will be used twice, cache.
 rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]);  //
 Transform and save, rdd materializes
 rddUnion = rddUnion.union(rdd.map(...).filter(...));  // Do another
 transform to T and merge by union
 rdd.unpersist();  // Now it seems not needed. (But needed actually)
 }
 // Here, rddUnion actually materializes, and needs all 10 rdds that
 already unpersisted.
 // So, rebuilding all 10 rdds will occur.
 rddUnion.saveAsTextFile(mergedFileName);
 }
 If rddUnion can be materialized before the rdd.unpersist() line and
 cache()d, the rdds in the loop will not be needed on
 rddUnion.saveAsTextFile().
 Now what is the best strategy?
 - Do not unpersist all 10 rdds in the loop.
 - Materialize rddUnion in the loop by calling 'light' action API, like
 first().
 - Give up and just rebuild/reload all 10 rdds when saving rddUnion.
 Is there some misunderstanding?
 Thanks.

Re: Spark Streaming not processing file with particular number of entries

2014-06-11 Thread praveshjain1991
Well i was able to get it to work by running spark over mesos. But it looks
like a bug while running spark alone.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694p7382.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: little confused about SPARK_JAVA_OPTS alternatives

2014-06-11 Thread elyast
Hi,

I'm facing similar problem

According to: http://tachyon-project.org/Running-Spark-on-Tachyon.html

in order to allow tachyon client to connect to tachyon master in HA mode you
need to pass 2 system properties: 

-Dtachyon.zookeeper.address=zookeeperHost1:2181,zookeeperHost2:2181
-Dtachyon.usezookeeper=true

Previously I was doing it with SPARK_JAVA_OPTS

I am trying in such a way:

spark-defaults.conf:
...
spark.executor.extraJavaOptions -Dtachyon.max.columns=1
-Dtachyon.usezookeeper=true
-Dtachyon.zookeeper.address=hadoop-zoo-1:2181,hadoop-zoo-2:2181,hadoop-zoo-3:2181
 
...

However I am getting exception that connection string is not set (the zk
string)

14/06/11 06:32:15 INFO :
initialize(tachyon-ft://hadoop-ha-1:19998/tmp/users.txt, Configuration:
core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml).
Connecting to Tachyon: tachyon-ft://hadoop-ha-1:19998/tmp/users.txt
14/06/11 06:32:15 INFO : Trying to connect master @
hadoop-ha-1/15.253.91.167:19998
14/06/11 06:32:15 WARN : tachyon.home is not set. Using
/mnt/tachyon_default_home as the default value.
Exception in thread main java.lang.NullPointerException: connectionString
cannot be null
at
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208)
at
org.apache.curator.ensemble.fixed.FixedEnsembleProvider.init(FixedEnsembleProvider.java:39)
at
org.apache.curator.framework.CuratorFrameworkFactory$Builder.connectString(CuratorFrameworkFactory.java:176)
at
org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:91)
at
org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:76)
at tachyon.LeaderInquireClient.init(LeaderInquireClient.java:48)
at tachyon.LeaderInquireClient.getClient(LeaderInquireClient.java:57)
at tachyon.master.MasterClient.getMasterAddress(MasterClient.java:96)


Any help appreciated, it's really blocker for me.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/little-confused-about-SPARK-JAVA-OPTS-alternatives-tp5798p7383.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Problem in Spark Streaming

2014-06-11 Thread nilmish

I used these commands to show the GC timings : -verbose:gc
-XX:-PrintGCDetails -XX:+PrintGCTimeStamps

Following is the output I got on the standard output :

4.092: [GC 4.092: [ParNew: 274752K-27199K(309056K), 0.0421460 secs]
274752K-27199K(995776K), 0.0422720 secs] [Times: user=0.33 sys=0.11,
real=0.04 secs] 

16.630: [GC 16.630: [ParNew: 301951K-17854K(309056K), 0.0686940 secs]
301951K-23624K(995776K), 0.0689110 secs] [Times: user=0.36 sys=0.05,
real=0.07 secs]
 
32.440: [GC 32.441: [ParNew: 292606K-14985K(309056K), 0.0206040 secs]
298376K-20755K(995776K), 0.0208320 secs] [Times: user=0.20 sys=0.00,
real=0.02 secs]
 
42.626: [GC 42.626: [ParNew: 289737K-15467K(309056K), 0.0138100 secs]
295507K-21237K(995776K), 0.0139830 secs] [Times: user=0.10 sys=0.00,
real=0.01 secs]
 
56.633: [GC 56.633: [ParNew: 290219K-17334K(309056K), 0.0170930 secs]
295989K-23105K(995776K), 0.0173130 secs] [Times: user=0.12 sys=0.01,
real=0.02 secs] 

Can anyone help me to understand these messgaes related to GC ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7384.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Problem in Spark Streaming

2014-06-11 Thread vinay Bajaj
http://stackoverflow.com/questions/895444/java-garbage-collection-log-messages

http://stackoverflow.com/questions/16794783/how-to-read-a-verbosegc-output

I think this will help in understanding the logs.


On Wed, Jun 11, 2014 at 12:53 PM, nilmish nilmish@gmail.com wrote:


 I used these commands to show the GC timings : -verbose:gc
 -XX:-PrintGCDetails -XX:+PrintGCTimeStamps

 Following is the output I got on the standard output :

 4.092: [GC 4.092: [ParNew: 274752K-27199K(309056K), 0.0421460 secs]
 274752K-27199K(995776K), 0.0422720 secs] [Times: user=0.33 sys=0.11,
 real=0.04 secs]

 16.630: [GC 16.630: [ParNew: 301951K-17854K(309056K), 0.0686940 secs]
 301951K-23624K(995776K), 0.0689110 secs] [Times: user=0.36 sys=0.05,
 real=0.07 secs]

 32.440: [GC 32.441: [ParNew: 292606K-14985K(309056K), 0.0206040 secs]
 298376K-20755K(995776K), 0.0208320 secs] [Times: user=0.20 sys=0.00,
 real=0.02 secs]

 42.626: [GC 42.626: [ParNew: 289737K-15467K(309056K), 0.0138100 secs]
 295507K-21237K(995776K), 0.0139830 secs] [Times: user=0.10 sys=0.00,
 real=0.01 secs]

 56.633: [GC 56.633: [ParNew: 290219K-17334K(309056K), 0.0170930 secs]
 295989K-23105K(995776K), 0.0173130 secs] [Times: user=0.12 sys=0.01,
 real=0.02 secs]

 Can anyone help me to understand these messgaes related to GC ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7384.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Number of Spark streams in Yarn cluster

2014-06-11 Thread tnegi
Hi,

I am trying to get a sense of number of streams we can process in parallel
on a Spark streaming cluster(Hadoop Yarn).
Is there any benchmark for the same? 
We need a large number of streams(original + transformed) to be processed in
parallel.
The number is approximately around= 30,.



thanks
Negi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-Spark-streams-in-Yarn-cluster-tp7386.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver

2014-06-11 Thread gaurav.dasgupta
Thanks Tobias for replying.

The problem was that, I have to provide the dependency jars' paths to the
StreamingContext within the code. So, providing all the jar paths, resolved
my problem. Refer the below code snippet:










*JavaStreamingContext ssc = new JavaStreamingContext(args[0],
SparkStreamExample, new Duration(1000),
System.getenv(SPARK_HOME), new String[]
{JavaStreamingContext.jarOfClass(SparkStreamExample.class)[0],
/usr/local/spark-0.9.1-bin-hadoop2/external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar,
/usr/lib/hbase/lib/zookeeper.jar,
/usr/local/kafka/kafka_2.10-0.8.1.1/libs/zkclient-0.3.jar,
/usr/local/kafka/kafka_2.10-0.8.1.1/libs/kafka_2.10-0.8.1.1.jar,
/usr/local/scala/lib/scala-library.jar,
/usr/local/shark-0.9.1-bin-hadoop2/lib_managed/jars/com.yammer.metrics/metrics-core/metrics-core-2.1.2.jar,
/usr/local/hbase.jar});*

The question is that isn't there any other way of doing this? The above
approach doesn't seem good to me. For example, what if I execute the
application on some other cluster where dependency paths are different? It
is also not feasible to parametrize these jar paths as user arguments.

Any advise will be appreciated.

Regards,
Gaurav

On Mon, Jun 9, 2014 at 6:23 AM, Tobias Pfeiffer [via Apache Spark User
List] ml-node+s1001560n7216...@n3.nabble.com wrote:

 Gaurav,

 I am not sure that the * expands to what you expect it to do.
 Normally the bash expands * to a space-separated string, not
 colon-separated. Try specifying all the jars manually, maybe?

 Tobias

 On Thu, Jun 5, 2014 at 6:45 PM, Gaurav Dasgupta [hidden email]
 http://user/SendEmail.jtp?type=nodenode=7216i=0 wrote:

  Hi,
 
  I have written my own custom Spark streaming code which connects to
 Kafka
  server and fetch data. I have tested the code on local mode and it is
  working fine. But when I am executing the same code on YARN mode, I am
  getting KafkaReceiver class not found exception. I am providing the
 Spark
  Kafka jar in the classpath and ensured that the path is correct for all
 the
  nodes in my cluster.
 
  I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes
 (10
  node cluster) in the YARN cluster.
  I am using the following command to run my code on YARN mode:
 
  SPARK_YARN_MODE=true
 
 SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar

  SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp
 
 /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/

  SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic
  NewTestTable 1
 
  Below is the error message I am getting:
 
  14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task
 set
  2.0 with 1 tasks
  14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as
 TID
  70 on executor 2: manny6.musigma.com (PROCESS_LOCAL)
  14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0
 as
  2971 bytes in 2 ms
  14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task
 2.0:0)
  14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to
  java.lang.ClassNotFoundException
  java.lang.ClassNotFoundException:
  org.apache.spark.streaming.kafka.KafkaReceiver
  at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:247)
  at
 
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)

  at
  java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574)
  at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495)
  at
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731)
  at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)
  at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)
  at
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
  at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
  at
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
  at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
  at
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
  at
  java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479)
  at
 
 

Re: HDFS Server/Client IPC version mismatch while trying to access HDFS files using Spark-0.9.1

2014-06-11 Thread bijoy deb
Any suggestions from anyone?

Thanks
Bijoy


On Tue, Jun 10, 2014 at 11:46 PM, bijoy deb bijoy.comput...@gmail.com
wrote:

 Hi all,

 I have build Shark-0.9.1 using sbt using the below command:

 *SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.6.0 sbt/sbt assembly*

 My Hadoop cluster is also having version 2.0.0-mr1-cdh4.6.0.

 But when I try to execute the below command from Spark shell,which reads a
 file from HDFS, I get the IPC version mismatch- IPC version 7 on server
 versus IPC version 4 on client error on org.apache.hadoop.hdfs.DFSClient
 class.











 *scala val s = sc.textFile(hdfs://host:port/test.txt)scala
 s.count()14/06/10 23:42:59 WARN util.NativeCodeLoader: Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable 14/06/10 23:42:59 WARN snappy.LoadSnappy: Snappy native library
 not loadedorg.apache.hadoop.ipc.RemoteException: Server IPC version 7
 cannot communicate with client version 4at
 org.apache.hadoop.ipc.Client.call(Client.java:1070) at
 org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)at
 com.sun.proxy.$Proxy9.getProtocolVersion(Unknown Source)at
 org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)at
 org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)*
 at
 org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
 at
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
 at
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
 at
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)


 Apparently this error is because of version mismatch of the hadoop-hdfs
 jar between client (one referred by Spark) and server(hadoop cluster).But
 what I don't understand is why is this mismatch (since I had built Spark
 with the correct Hadoop version).

 Any suggestions would be highly appreciated.

 Thanks
 Bijoy



Re: Hanging Spark jobs

2014-06-11 Thread Daniel Darabos
These stack traces come from the stuck node? Looks like it's waiting on
data in BlockFetcherIterator. Waiting for data from another node. But you
say all other nodes were done? Very curious.

Maybe you could try turning on debug logging, and try to figure out what
happens in BlockFetcherIterator (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala).
I do not think it is supposed to get stuck indefinitely.

On Tue, Jun 10, 2014 at 8:22 PM, Hurwitz, Daniel dhurw...@ebay.com wrote:

  Hi,



 We are observing a recurring issue where our Spark jobs are hanging for
 several hours, even days, until we kill them.



 We are running Spark v0.9.1 over YARN.



 Our input is a list of edges of a graph on which we use Bagel to compute
 connected components using the following method:



 *class* CCMessage(*var* targetId: Long, *var* myComponentId: Long)
 *extends* Message[Long] *with* Serializable

 *def* compute(self: CC, msgs: Option[Array[CCMessage]], superstep: Int):
 (CC, Array[CCMessage]) = {

   *val* smallestComponentId = msgs.map(sq = *sq.map(_.*
 *myComponentId**)*.min).getOrElse(Long.MaxValue)

   *val* newComponentId = math.min(self.clusterID, smallestComponentId)

   *val* halt = (newComponentId == self.clusterID) || (superstep =
 maxIters)

   self.active = *if* (superstep == 0) *true* *else* !halt

   *val* outGoingMessages = *if* (halt  superstep  0)
 Array[CCMessage]()

   *else* self.edges.map(targetId = *new* CCMessage(targetId,
 newComponentId)).toArray

   self.clusterID = newComponentId



   (self, outGoingMessages)

 }



 Our output is a text file in which each line is a list of the node IDs in
 each component. The size of the output may be up to 6 GB.



 We see in the job tracker that most of the time jobs usually get stuck on
 the “saveAsTextFile” command, the final line in our code. In some cases,
 the job will hang during one of the iterations of Bagel during the
 computation of the connected components.



 Oftentimes, when we kill the job and re-execute it, it will finish
 successfully within an hour which is the expected duration. We notice that
 if our Spark jobs don’t finish after a few hours, they will never finish
 until they are killed, regardless of the load on our cluster.



 After consulting with our Hadoop support team, they noticed that after a
 particular hanging Spark job was running for 38 hours, all Spark processes
 on all nodes were completed except for one node which was running more than
 9 hours consuming very little CPU, then suddenly consuming 14s of CPU, then
 back to calm. Also, the other nodes were not relinquishing their resources
 until our Hadoop admin killed the process on that problematic node and
 suddenly the job finished and “success” was reported in the job tracker.
 The output seemed to be fine too. If it helps you understand the issue, the
 Hadoop admin suggested this was a Spark issue and sent us two stack dumps
 which I attached to this email: before killing the node’s Spark process
 (dump1.txt) and after (dump2.txt).



 Any advice on how to resolve this issue? How can we debug this?

  Thanks,

 ~Daniel





Spark SQL incorrect result on GROUP BY query

2014-06-11 Thread Pei-Lun Lee
Hi,

I am using spark 1.0.0 and found in spark sql some queries use GROUP BY
give weird results.
To reproduce, type the following commands in spark-shell connecting to a
standalone server:

case class Foo(k: String, v: Int)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++
List.fill(300)(Foo(c, 3))
sc.makeRDD(rows).registerAsTable(foo)
sql(select k,count(*) from foo group by k).collect

the result will be something random like:
res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75],
[c,270], [4,56], [1,1])

and if I run the same query again, the new result will be correct:
sql(select k,count(*) from foo group by k).collect
res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300])

Should I file a bug?

--
Pei-Lun Lee


Normalizations in MLBase

2014-06-11 Thread Aslan Bekirov
Hi All,

I have to normalize a set of values in the range 0-500 to the [0-1] range.

Is there any util method in MLBase to normalize large set of data?

BR,
Aslan


RE: Spark SQL incorrect result on GROUP BY query

2014-06-11 Thread Cheng, Hao
That’s a good catch, but I think it’s suggested to use HiveContext currently.  
( https://github.com/apache/spark/tree/master/sql)

Catalyst$ sbt/sbt hive/console
case class Foo(k: String, v: Int)
val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++ 
List.fill(300)(Foo(c, 3))
sparkContext.makeRDD(rows).registerAsTable(foo)
sql(select k,count(*) from foo group by k).collect
res1: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300])

Cheng Hao
From: Pei-Lun Lee [mailto:pl...@appier.com]
Sent: Wednesday, June 11, 2014 6:01 PM
To: user@spark.apache.org
Subject: Spark SQL incorrect result on GROUP BY query

Hi,

I am using spark 1.0.0 and found in spark sql some queries use GROUP BY give 
weird results.
To reproduce, type the following commands in spark-shell connecting to a 
standalone server:

case class Foo(k: String, v: Int)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++ 
List.fill(300)(Foo(c, 3))
sc.makeRDD(rows).registerAsTable(foo)
sql(select k,count(*) from foo group by k).collect

the result will be something random like:
res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75], [c,270], 
[4,56], [1,1])

and if I run the same query again, the new result will be correct:
sql(select k,count(*) from foo group by k).collect
res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300])

Should I file a bug?

--
Pei-Lun Lee


Re: Information on Spark UI

2014-06-11 Thread Daniel Darabos
About more succeeded tasks than total tasks:
 - This can happen if you have enabled speculative execution. Some
partitions can get processed multiple times.
 - More commonly, the result of the stage may be used in a later
calculation, and has to be recalculated. This happens if some of the
results were evicted from cache.


On Wed, Jun 11, 2014 at 2:23 AM, Shuo Xiang shuoxiang...@gmail.com wrote:

 Hi,
   Came up with some confusion regarding the information on SparkUI. The
 following info is gathered while factorizing a large matrix using ALS:
   1. some stages have more succeeded tasks than total tasks, which are
 displayed in the 5th column.
   2. duplicate stages with exactly same stageID (stage 1/3/7)
   3. Clicking into some stages, some executors cannot be addressed. Does
 that mean lost of executor or this does not matter?

   Any explanation are appreciated!





Re: Error During ReceivingConnection

2014-06-11 Thread Surendranauth Hiraman
It looks like this was due to another executor on a different node closing
the connection on its side. I found the entries below in the remote side's
logs.

Can anyone comment on why one ConnectionManager would close its connection
to another node and what could be tuned to avoid this? It did not have any
errors on its side.


This is from the ConnectionManager on the side shutting down the
connection, not the ConnectionManager that had the Connection Reset By
Peer.

14/06/10 18:51:14 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.125,45610)

14/06/10 18:51:14 INFO network.ConnectionManager: Removing
SendingConnection to ConnectionManagerId(172.16.25.125,45610)




On Wed, Jun 11, 2014 at 8:38 AM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 I have a somewhat large job (10 GB input data but generates about 500 GB
 of data after many stages).

 Most tasks completed but a few stragglers on the same node/executor are
 still active (but doing nothing) after about 16 hours.

 At about 3 to 4 hours in, the tasks that are hanging have the following in
 the work logs.

 Any idea what config to tweak for this?


 14/06/10 18:51:10 WARN network.ReceivingConnection: Error reading from
 connection to ConnectionManagerId(172.16.25.108,37693)
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcher.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)
  at sun.nio.ch.IOUtil.read(IOUtil.java:224)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)
  at
 org.apache.spark.network.ReceivingConnection.read(Connection.scala:534)
 at
 org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
  at java.lang.Thread.run(Thread.java:679)
 14/06/10 18:51:10 INFO network.ConnectionManager: Handling connection
 error on connection to ConnectionManagerId(172.16.25.108,37693)
 14/06/10 18:51:10 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.108,37693)
 14/06/10 18:51:10 INFO network.ConnectionManager: Removing
 SendingConnection to ConnectionManagerId(172.16.25.108,37693)
 14/06/10 18:51:10 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.108,37693)
 14/06/10 18:51:10 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found
 14/06/10 18:51:10 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.108,37693)
 14/06/10 18:51:10 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found
 14/06/10 18:51:14 WARN network.ReceivingConnection: Error reading from
 connection to ConnectionManagerId(172.16.25.97,54918)
 java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcher.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)
 at sun.nio.ch.IOUtil.read(IOUtil.java:224)
  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)
 at org.apache.spark.network.ReceivingConnection.read(Connection.scala:534)
  at
 org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:679)
 14/06/10 18:51:14 INFO network.ConnectionManager: Handling connection
 error on connection to ConnectionManagerId(172.16.25.97,54918)
 14/06/10 18:51:14 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.97,54918)
 14/06/10 18:51:14 INFO network.ConnectionManager: Removing
 SendingConnection to ConnectionManagerId(172.16.25.97,54918)
 14/06/10 18:51:14 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.97,54918)
 14/06/10 18:51:14 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found
 14/06/10 18:51:14 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.97,54918)
 14/06/10 18:51:14 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found

 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v 

Re: Information on Spark UI

2014-06-11 Thread Shuo Xiang
Daniel,
  Thanks for the explanation.


On Wed, Jun 11, 2014 at 8:57 AM, Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 About more succeeded tasks than total tasks:
  - This can happen if you have enabled speculative execution. Some
 partitions can get processed multiple times.
  - More commonly, the result of the stage may be used in a later
 calculation, and has to be recalculated. This happens if some of the
 results were evicted from cache.


 On Wed, Jun 11, 2014 at 2:23 AM, Shuo Xiang shuoxiang...@gmail.com
 wrote:

 Hi,
   Came up with some confusion regarding the information on SparkUI. The
 following info is gathered while factorizing a large matrix using ALS:
   1. some stages have more succeeded tasks than total tasks, which are
 displayed in the 5th column.
   2. duplicate stages with exactly same stageID (stage 1/3/7)
   3. Clicking into some stages, some executors cannot be addressed. Does
 that mean lost of executor or this does not matter?

   Any explanation are appreciated!






Re: Spark SQL incorrect result on GROUP BY query

2014-06-11 Thread Michael Armbrust
I'd try rerunning with master.  It is likely you are running into SPARK-1994
https://issues.apache.org/jira/browse/SPARK-1994.

Michael


On Wed, Jun 11, 2014 at 3:01 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 I am using spark 1.0.0 and found in spark sql some queries use GROUP BY
 give weird results.
 To reproduce, type the following commands in spark-shell connecting to a
 standalone server:

 case class Foo(k: String, v: Int)
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext._
 val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++
 List.fill(300)(Foo(c, 3))
 sc.makeRDD(rows).registerAsTable(foo)
 sql(select k,count(*) from foo group by k).collect

 the result will be something random like:
 res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75],
 [c,270], [4,56], [1,1])

 and if I run the same query again, the new result will be correct:
 sql(select k,count(*) from foo group by k).collect
 res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300])

 Should I file a bug?

 --
 Pei-Lun Lee



Adding external jar to spark-shell classpath in spark 1.0

2014-06-11 Thread Ulanov, Alexander
Hi,

I am currently using spark 1.0 locally on Windows 7. I would like to use 
classes from external jar in the spark-shell. I followed the instruction in: 
http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCALrNVjWWF6k=c_jrhoe9w_qaacjld4+kbduhfv0pitr8h1f...@mail.gmail.com%3E

I have set ADD_JARS=my.jar SPARK_CLASSPATH=my.jar in spark-shell.cmd but 
this didn't work.

I also tried running spark-shell.cmd --jars my.jar --driver-class-path my.jar 
--driver-library-path my.jar and it didn't work either.

I cannot load any class from my jar into spark shell. Btw my.jar contains a 
simple Scala class.

Best regards, Alexander


Re: Adding external jar to spark-shell classpath in spark 1.0

2014-06-11 Thread Marcelo Vanzin
Ah, not that it should matter, but I'm on Linux and you seem to be on
Windows... maybe there is something weird going on with the Windows
launcher?

On Wed, Jun 11, 2014 at 10:34 AM, Marcelo Vanzin van...@cloudera.com wrote:
 Just tried this and it worked fine for me:

 ./bin/spark-shell --jars jar1,jar2,etc,etc

 On Wed, Jun 11, 2014 at 10:25 AM, Ulanov, Alexander
 alexander.ula...@hp.com wrote:
 Hi,



 I am currently using spark 1.0 locally on Windows 7. I would like to use
 classes from external jar in the spark-shell. I followed the instruction in:
 http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCALrNVjWWF6k=c_jrhoe9w_qaacjld4+kbduhfv0pitr8h1f...@mail.gmail.com%3E



 I have set ADD_JARS=”my.jar” SPARK_CLASSPATH=”my.jar” in spark-shell.cmd but
 this didn’t work.



 I also tried running “spark-shell.cmd --jars my.jar --driver-class-path
 my.jar --driver-library-path my.jar” and it didn’t work either.



 I cannot load any class from my jar into spark shell. Btw my.jar contains a
 simple Scala class.



 Best regards, Alexander



 --
 Marcelo



-- 
Marcelo


Re: pmml with augustus

2014-06-11 Thread Villu Ruusmann
Hello Spark/PMML enthusiasts,

It's pretty trivial to integrate the JPMML-Evaluator library with Spark. In
brief, take the following steps in your Spark application code:
1) Create a Java Map (arguments) that represents the input data record.
You need to specify a key-value mapping for every active MiningField. The
key type is org.jpmml.evaluator.FieldName. The value type could be String or
any Java primitive data type that can be converted to the requested PMML
type.
2) Obtain an instance of org.jpmml.evaluator.Evaluator. Invoke its
#evaluate(MapFieldName, ?) method using the argument map created in step
1.
3) Process the Java Map (results) that represents the output data record.

Putting it all together:
JavaRDDMaplt;FieldName, String arguments = ...
final ModelEvaluator? modelEvaluator =
(ModelEvaluator?)pmmlManager.getModelManager(null,
ModelEvaluatorFactory.getInstance()); // See the JPMML-Evaluator
documentation
JavaRDDMaplt;FieldName, ? results = arguments.flatMap(new
FlatMapFunctionMaplt;FieldName, String, MapFieldName, ?(){

@Override
public IterableMaplt;FieldName, ? call(MapFieldName, String
arguments){
MapFieldName, ? result = modelEvaluator.evaluate(arguments);
return Collections.Maplt;FieldName, ?singletonList(result);
}
});

Of course, it's not very elegant to be using JavaRDDMaplt;K, V here.
Maybe someone can give me a hint about making it look and feel more Spark-y?

Also, I would like to refute earlier comment by @pacoid, that
JPMML-evaluator compares poorly against Augustus and Zementis products.
First, JPMML-Evaluator fully supports PMML specification versions 3.0
through 4.2. I would specifically stress the support for PMML 4.2, which was
released just a few months ago. Second, JPMML is open source. Perhaps its
licensing terms could be more liberal, but it's nevertheless the most open
and approachable way of bringing Java and PMML together.


VR



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313p7412.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Adding external jar to spark-shell classpath in spark 1.0

2014-06-11 Thread Ulanov, Alexander
Are you able to import any class from you jars within spark-shell?

-Original Message-
From: Marcelo Vanzin [mailto:van...@cloudera.com] 
Sent: Wednesday, June 11, 2014 9:36 PM
To: user@spark.apache.org
Subject: Re: Adding external jar to spark-shell classpath in spark 1.0

Ah, not that it should matter, but I'm on Linux and you seem to be on 
Windows... maybe there is something weird going on with the Windows launcher?

On Wed, Jun 11, 2014 at 10:34 AM, Marcelo Vanzin van...@cloudera.com wrote:
 Just tried this and it worked fine for me:

 ./bin/spark-shell --jars jar1,jar2,etc,etc

 On Wed, Jun 11, 2014 at 10:25 AM, Ulanov, Alexander 
 alexander.ula...@hp.com wrote:
 Hi,



 I am currently using spark 1.0 locally on Windows 7. I would like to 
 use classes from external jar in the spark-shell. I followed the instruction 
 in:
 http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCA
 LrNVjWWF6k=c_jrhoe9w_qaacjld4+kbduhfv0pitr8h1f...@mail.gmail.com%3E



 I have set ADD_JARS=”my.jar” SPARK_CLASSPATH=”my.jar” in 
 spark-shell.cmd but this didn’t work.



 I also tried running “spark-shell.cmd --jars my.jar 
 --driver-class-path my.jar --driver-library-path my.jar” and it didn’t work 
 either.



 I cannot load any class from my jar into spark shell. Btw my.jar 
 contains a simple Scala class.



 Best regards, Alexander



 --
 Marcelo



--
Marcelo


RE: Adding external jar to spark-shell classpath in spark 1.0

2014-06-11 Thread Ulanov, Alexander
Could you elaborate on this? I don’t have an application, I just use spark 
shell.

From: Andrew Or [mailto:and...@databricks.com]
Sent: Wednesday, June 11, 2014 9:40 PM
To: user@spark.apache.org
Subject: Re: Adding external jar to spark-shell classpath in spark 1.0

This is a known issue: https://issues.apache.org/jira/browse/SPARK-1919. We 
haven't found a fix yet, but for now, you can workaround this by including your 
simple class in your application jar.

2014-06-11 10:25 GMT-07:00 Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com:
Hi,

I am currently using spark 1.0 locally on Windows 7. I would like to use 
classes from external jar in the spark-shell. I followed the instruction in: 
http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCALrNVjWWF6k=c_jrhoe9w_qaacjld4+kbduhfv0pitr8h1f...@mail.gmail.com%3E

I have set ADD_JARS=”my.jar” SPARK_CLASSPATH=”my.jar” in spark-shell.cmd but 
this didn’t work.

I also tried running “spark-shell.cmd --jars my.jar --driver-class-path my.jar 
--driver-library-path my.jar” and it didn’t work either.

I cannot load any class from my jar into spark shell. Btw my.jar contains a 
simple Scala class.

Best regards, Alexander



Re: HDFS Server/Client IPC version mismatch while trying to access HDFS files using Spark-0.9.1

2014-06-11 Thread Marcelo Vanzin
The error is saying that your client libraries are older than what
your server is using (2.0.0-mr1-cdh4.6.0 is IPC version 7).

Try double-checking that your build is actually using that version
(e.g., by looking at the hadoop jar files in lib_managed/jars).

On Wed, Jun 11, 2014 at 2:07 AM, bijoy deb bijoy.comput...@gmail.com wrote:
 Any suggestions from anyone?

 Thanks
 Bijoy


 On Tue, Jun 10, 2014 at 11:46 PM, bijoy deb bijoy.comput...@gmail.com
 wrote:

 Hi all,

 I have build Shark-0.9.1 using sbt using the below command:

 SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.6.0 sbt/sbt assembly

 My Hadoop cluster is also having version 2.0.0-mr1-cdh4.6.0.

 But when I try to execute the below command from Spark shell,which reads a
 file from HDFS, I get the IPC version mismatch- IPC version 7 on server
 versus IPC version 4 on client error on org.apache.hadoop.hdfs.DFSClient
 class.


 scala val s = sc.textFile(hdfs://host:port/test.txt)
 scala s.count()
 14/06/10 23:42:59 WARN util.NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/06/10 23:42:59 WARN snappy.LoadSnappy: Snappy native library not loaded
 org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot
 communicate with client version 4
 at org.apache.hadoop.ipc.Client.call(Client.java:1070)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
 at com.sun.proxy.$Proxy9.getProtocolVersion(Unknown Source)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
 at
 org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
 at
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
 at
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
 at
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)


 Apparently this error is because of version mismatch of the hadoop-hdfs
 jar between client (one referred by Spark) and server(hadoop cluster).But
 what I don't understand is why is this mismatch (since I had built Spark
 with the correct Hadoop version).

 Any suggestions would be highly appreciated.

 Thanks
 Bijoy





-- 
Marcelo


Re: Information on Spark UI

2014-06-11 Thread Shuo Xiang
Using MEMORY_AND_DISK_SER to persist the input RDD[Rating] seems to work
right for me now. I'm testing on a larger dataset and will see how it goes.


On Wed, Jun 11, 2014 at 9:56 AM, Neville Li neville@gmail.com wrote:

 Does cache eviction affect disk storage level too? I tried cranking up
 replication but still seeing this.


 On Wednesday, June 11, 2014, Shuo Xiang shuoxiang...@gmail.com wrote:

 Daniel,
   Thanks for the explanation.


 On Wed, Jun 11, 2014 at 8:57 AM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 About more succeeded tasks than total tasks:
  - This can happen if you have enabled speculative execution. Some
 partitions can get processed multiple times.
  - More commonly, the result of the stage may be used in a later
 calculation, and has to be recalculated. This happens if some of the
 results were evicted from cache.


 On Wed, Jun 11, 2014 at 2:23 AM, Shuo Xiang shuoxiang...@gmail.com
 wrote:

 Hi,
   Came up with some confusion regarding the information on SparkUI. The
 following info is gathered while factorizing a large matrix using ALS:
   1. some stages have more succeeded tasks than total tasks, which are
 displayed in the 5th column.
   2. duplicate stages with exactly same stageID (stage 1/3/7)
   3. Clicking into some stages, some executors cannot be addressed.
 Does that mean lost of executor or this does not matter?

   Any explanation are appreciated!







Powered by Spark addition

2014-06-11 Thread Derek Mansen
Hello, I was wondering if we could add our organization to the Powered by
Spark page. The information is:

Name: Vistar Media
URL: www.vistarmedia.com
Description: Location technology company enabling brands to reach on-the-go
consumers.

Let me know if you need anything else.

Thanks!
Derek Mansen


Re: Having trouble with streaming (updateStateByKey)

2014-06-11 Thread Michael Campbell
I rearranged my code to do a reduceByKey which I think is working.  I also
don't think the problem was that updateState call, but something else;
unfortunately I changed a lot in looking for this issue, so not sure what
the actual fix might have been, but I think it's working now.


On Wed, Jun 11, 2014 at 1:47 PM, Michael Campbell 
michael.campb...@gmail.com wrote:

 I'm having a little trouble getting an updateStateByKey() call to work;
 was wondering if anyone could help.

 In my chain of calls from getting Kafka messages out of the queue to
 converting the message to a set of things, then pulling out 2 attributes
 of those things to a Tuple2, everything works.

 So what I end up with is about a 1 second dump of things like this (this
 is crufted up data, but it's basically 2 IPV6 addresses...)

 ---
 Time: 1402507839000 ms
 ---
 (:::a14:b03,:::a0a:2bd4)
 (:::a14:b03,:::a0a:2bd4)
 (:::a0a:25a7,:::a14:b03)
 (:::a14:b03,:::a0a:2483)
 (:::a14:b03,:::a0a:2480)
 (:::a0a:2d96,:::a14:b03)
 (:::a0a:abb5,:::a14:100)
 (:::a0a:dcd7,:::a14:28)
 (:::a14:28,:::a0a:da94)
 (:::a14:b03,:::a0a:2def)
 ...


 This works ok.

 The problem is when I add a call to updateStateByKey - the streaming app
 runs and runs and runs and never outputs anything.  When I debug, I can't
 confirm that my state update passed-in function is ever actually getting
 called.

 Indeed I have breakpoints and println statements in my updateFunc and it
 LOOKS like it's never getting called.  I can confirm that the
 updateStateByKey function IS getting called (via it stopping on a
 breakpoint).

 Is there something obvious I'm missing?



Kafka client - specify offsets?

2014-06-11 Thread Michael Campbell
Is there a way in the Apache Spark Kafka Utils to specify an offset to
start reading?  Specifically, from the start of the queue, or failing that,
a specific point?


Re: Normalizations in MLBase

2014-06-11 Thread DB Tsai
Hi Aslan,

Currently, we don't have the utility function to do so. However, you
can easily implement this by another map transformation. I'm working
on this feature now, and there will be couple different available
normalization option users can chose.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, Jun 11, 2014 at 6:25 AM, Aslan Bekirov aslanbeki...@gmail.com wrote:
 Hi All,

 I have to normalize a set of values in the range 0-500 to the [0-1] range.

 Is there any util method in MLBase to normalize large set of data?

 BR,
 Aslan


Re: json parsing with json4s

2014-06-11 Thread Michael Cutler
Hello,

You're absolutely right, the syntax you're using is returning the json4s
value objects, not native types like Int, Long etc. fix that problem and
then everything else (filters) will work as you expect.  This is a short
snippet of a larger example: [1]

val lines = sc.textFile(likes.json)
val user_interest = lines.map(line = {
  // Parse the JSON, returns RDD[JValue]
  parse(line)
}).map(json = {
  // Extract the values we need to populate the UserInterest class
  implicit lazy val formats = org.json4s.DefaultFormats
  val name = (json \ name).extract[String]
  val location_x = (json \ location \ x).extract[Double]
  val location_y = (json \ location \ y).extract[Double]
  val likes = (json \
likes).extract[Seq[String]].map(_.toLowerCase()).mkString(;)
  ( UserInterest(name, location_x, location_y, likes) )
})


The key parts are implicit lazy val formats = org.json4s.DefaultFormats
being defined before you mess with the JSON and (json \ location \ x).
extract[Double] to extract the parts you need.

One thing to be wary of is if you're JSON is not consistent, i.e. fields
not always being set -- then using the extract[Double] method will raise
exceptions.  Then you may wish to use an alternate way to pull out the
values as a String and process them yourself. e.g.

val id = compact(render(json \ facebook \ id))

Good luck playing with JSON and Spark!  :o)

Best,

MC


[1] UserInterestsExample.scala
https://gist.github.com/cotdp/b471cfff183b59d65ae1





On 11 June 2014 23:26, SK skrishna...@gmail.com wrote:

 I have the following piece of code that parses a json file and extracts the
 age and TypeID

 val p = sc.textFile(log_file)
.map(line = { parse(line) })
.map(json =
   {  val v1 = json \ person \ age
  val v2 = json \ Action \ Content \ TypeID
  (v1, v2)
   }
 )

 p.foreach(r = println(r))

 The result is:

 (JInt(12),JInt(5))
 (JInt(32),JInt(6))
 (JInt(40),JInt(7))

 1) How can I extract the values (i.e. without the JInt) ? I tried returning
 (v1.toInt, v2.toInt) from the map but got a compilation error stating that
 toInt is not a valid operation.

 2) I would also like to know how  I can filter the above tuples based on
 the
 age values. For e.g. I added the following after the second map operation:

   p.filter(tup = tup._1  20)

 I got a compilation errror: value  is not a member of org.json4s.JValue

 Thanks for your help.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/json-parsing-with-json4s-tp7430.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Powered by Spark addition

2014-06-11 Thread Matei Zaharia
Alright, added you.

Matei

On Jun 11, 2014, at 1:28 PM, Derek Mansen de...@vistarmedia.com wrote:

 Hello, I was wondering if we could add our organization to the Powered by 
 Spark page. The information is:
 
 Name: Vistar Media
 URL: www.vistarmedia.com
 Description: Location technology company enabling brands to reach on-the-go 
 consumers.
 
 Let me know if you need anything else.
 
 Thanks!
 Derek Mansen



Re: Not fully cached when there is enough memory

2014-06-11 Thread Xiangrui Meng
Could you try to click one that RDD and see the storage info per
partition? I tried continuously caching RDDs, so new ones kick old
ones out when there is not enough memory. I saw similar glitches but
the storage info per partition is correct. If you find a way to
reproduce this error, please create a JIRA. Thanks! -Xiangrui


Re: Using Spark on Data size larger than Memory size

2014-06-11 Thread Allen Chang
Thanks. We've run into timeout issues at scale as well. We were able to
workaround them by setting the following JVM options:

-Dspark.akka.askTimeout=300
-Dspark.akka.timeout=300
-Dspark.worker.timeout=300

NOTE: these JVM options *must* be set on worker nodes (and not just the
driver/master) for the settings to take.

Allen





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-on-Data-size-larger-than-Memory-size-tp6589p7435.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: problem starting the history server on EC2

2014-06-11 Thread zhen
I tried everything including sudo, but it still did not work using the local
directory.
However, I finally got it working by getting the history server to log into
hdfs.
I first created a directory in hdfs like the following:
./ephemeral-hdfs/bin/hadoop fs -mkdir /spark_logs
Then I started the started the history server like the following.

./start-history-server.sh hdfs:///spark_logs --port 18080

In order to see the history server UI I needed to open up inbound traffic
for the port 18080 in AWS. As follows

custom TCP  port 18080 from anywhere

Hope this will help others.

Zhen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361p7436.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using Spark to crack passwords

2014-06-11 Thread Marek Wiewiorka
What about rainbow tables?
http://en.wikipedia.org/wiki/Rainbow_table

M.


2014-06-12 2:41 GMT+02:00 DB Tsai dbt...@stanford.edu:

 I think creating the samples in the search space within RDD will be
 too expensive, and the amount of data will probably be larger than any
 cluster.

 However, you could create a RDD of searching ranges, and each range
 will be searched by one map operation. As a result, in this design,
 the # of row in RDD will be the same as the # of executors, and we can
 use mapPartition to loop through all the sample in the range without
 actually storing them in RDD.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, Jun 11, 2014 at 5:24 PM, Nick Chammas
 nicholas.cham...@gmail.com wrote:
  Spark is obviously well-suited to crunching massive amounts of data. How
  about to crunch massive amounts of numbers?
 
  A few years ago I put together a little demo for some co-workers to
  demonstrate the dangers of using SHA1 to hash and store passwords. Part
 of
  the demo included a live brute-forcing of hashes to show how SHA1's speed
  made it unsuitable for hashing passwords.
 
  I think it would be cool to redo the demo, but utilize the power of a
  cluster managed by Spark to crunch through hashes even faster.
 
  But how would you do that with Spark (if at all)?
 
  I'm guessing you would create an RDD that somehow defined the search
 space
  you're going to go through, and then partition it to divide the work up
  equally amongst the cluster's cores. Does that sound right?
 
  I wonder if others have already used Spark for computationally-intensive
  workloads like this, as opposed to just data-intensive ones.
 
  Nick
 
 
  
  View this message in context: Using Spark to crack passwords
  Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Not fully cached when there is enough memory

2014-06-11 Thread Shuo Xiang
Xiangrui, clicking into the RDD link, it gives the same message, say only
96 of 100 partitions are cached. The disk/memory usage are the same, which
is far below the limit.
Is this what you want to check or other issue?


On Wed, Jun 11, 2014 at 4:38 PM, Xiangrui Meng men...@gmail.com wrote:

 Could you try to click one that RDD and see the storage info per
 partition? I tried continuously caching RDDs, so new ones kick old
 ones out when there is not enough memory. I saw similar glitches but
 the storage info per partition is correct. If you find a way to
 reproduce this error, please create a JIRA. Thanks! -Xiangrui



Re: When to use CombineByKey vs reduceByKey?

2014-06-11 Thread Matei Zaharia
combineByKey is designed for when your return type from the aggregation is 
different from the values being aggregated (e.g. you group together objects), 
and it should allow you to modify the leftmost argument of each function 
(mergeCombiners, mergeValue, etc) and return that instead of allocating a new 
object. So it should work with mutable objects — please post what problems you 
had with that. reduceByKey actually also allows this if your types are the same.

Matei

On Jun 11, 2014, at 3:21 PM, Diana Hu siyin...@gmail.com wrote:

 Hello all,
 
 I've seen some performance improvements using combineByKey as opposed to 
 reduceByKey or a groupByKey+map function. I have a couple questions. it'd be 
 great if any one can provide some light into this.
 
 1) When should I use combineByKey vs reduceByKey?
 
 2) Do the containers need to be immutable for combineByKey? I've created 
 custom data structures for the containers, one mutable and one immutable. The 
 tests with the mutable containers, spark crashed with an error on missing 
 references. However the downside of immutable containers (which works on my 
 tests), is that for large datasets the garbage collector gets called many 
 more times, and it tends to run out of heap space as the GC can't catch up. I 
 tried some of the tips here 
 http://spark.apache.org/docs/latest/tuning.html#memory-tuning and tuning the 
 JVM params, but this seems to be too much tuning?
 
 Thanks in advance,
 - Diana



Re: Using Spark to crack passwords

2014-06-11 Thread Nicholas Chammas
Yes, I mean the RDD would just have elements to define partitions or
ranges within the search space, not have actual hashes. It's really just a
using the RDD as a control structure, rather than a real data set.

As you noted, we don't need to store any hashes. We just need to check them
as they are computed against our target hash.

2014년 6월 11일 수요일, DB Tsaidbt...@stanford.edu님이 작성한 메시지:

 I think creating the samples in the search space within RDD will be
 too expensive, and the amount of data will probably be larger than any
 cluster.

 However, you could create a RDD of searching ranges, and each range
 will be searched by one map operation. As a result, in this design,
 the # of row in RDD will be the same as the # of executors, and we can
 use mapPartition to loop through all the sample in the range without
 actually storing them in RDD.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, Jun 11, 2014 at 5:24 PM, Nick Chammas
 nicholas.cham...@gmail.com javascript:; wrote:
  Spark is obviously well-suited to crunching massive amounts of data. How
  about to crunch massive amounts of numbers?
 
  A few years ago I put together a little demo for some co-workers to
  demonstrate the dangers of using SHA1 to hash and store passwords. Part
 of
  the demo included a live brute-forcing of hashes to show how SHA1's speed
  made it unsuitable for hashing passwords.
 
  I think it would be cool to redo the demo, but utilize the power of a
  cluster managed by Spark to crunch through hashes even faster.
 
  But how would you do that with Spark (if at all)?
 
  I'm guessing you would create an RDD that somehow defined the search
 space
  you're going to go through, and then partition it to divide the work up
  equally amongst the cluster's cores. Does that sound right?
 
  I wonder if others have already used Spark for computationally-intensive
  workloads like this, as opposed to just data-intensive ones.
 
  Nick
 
 
  
  View this message in context: Using Spark to crack passwords
  Sent from the Apache Spark User List mailing list archive at Nabble.com.



use spark-shell in the source

2014-06-11 Thread JaeBoo Jung
Title: Samsung Enterprise Portal mySingle


Hi all,

Can I use spark-shell programmatically in my spark application(in java or scala)?
Because I want toconvert scalalines to string array and run automatically in my application.
For example,
for( var line - lines){
//run this line in spark shell style and get outputs.
run(line);
}
Thanks
_ 
JaeBoo, Jung AssistantEngineer/BDA Lab/ Samsung SDS