RE: Question about RDD cache, unpersist, materialization
If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks.
Re: Spark Streaming not processing file with particular number of entries
Well i was able to get it to work by running spark over mesos. But it looks like a bug while running spark alone. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694p7382.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: little confused about SPARK_JAVA_OPTS alternatives
Hi, I'm facing similar problem According to: http://tachyon-project.org/Running-Spark-on-Tachyon.html in order to allow tachyon client to connect to tachyon master in HA mode you need to pass 2 system properties: -Dtachyon.zookeeper.address=zookeeperHost1:2181,zookeeperHost2:2181 -Dtachyon.usezookeeper=true Previously I was doing it with SPARK_JAVA_OPTS I am trying in such a way: spark-defaults.conf: ... spark.executor.extraJavaOptions -Dtachyon.max.columns=1 -Dtachyon.usezookeeper=true -Dtachyon.zookeeper.address=hadoop-zoo-1:2181,hadoop-zoo-2:2181,hadoop-zoo-3:2181 ... However I am getting exception that connection string is not set (the zk string) 14/06/11 06:32:15 INFO : initialize(tachyon-ft://hadoop-ha-1:19998/tmp/users.txt, Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml). Connecting to Tachyon: tachyon-ft://hadoop-ha-1:19998/tmp/users.txt 14/06/11 06:32:15 INFO : Trying to connect master @ hadoop-ha-1/15.253.91.167:19998 14/06/11 06:32:15 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value. Exception in thread main java.lang.NullPointerException: connectionString cannot be null at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208) at org.apache.curator.ensemble.fixed.FixedEnsembleProvider.init(FixedEnsembleProvider.java:39) at org.apache.curator.framework.CuratorFrameworkFactory$Builder.connectString(CuratorFrameworkFactory.java:176) at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:91) at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:76) at tachyon.LeaderInquireClient.init(LeaderInquireClient.java:48) at tachyon.LeaderInquireClient.getClient(LeaderInquireClient.java:57) at tachyon.master.MasterClient.getMasterAddress(MasterClient.java:96) Any help appreciated, it's really blocker for me. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/little-confused-about-SPARK-JAVA-OPTS-alternatives-tp5798p7383.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Problem in Spark Streaming
I used these commands to show the GC timings : -verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps Following is the output I got on the standard output : 4.092: [GC 4.092: [ParNew: 274752K-27199K(309056K), 0.0421460 secs] 274752K-27199K(995776K), 0.0422720 secs] [Times: user=0.33 sys=0.11, real=0.04 secs] 16.630: [GC 16.630: [ParNew: 301951K-17854K(309056K), 0.0686940 secs] 301951K-23624K(995776K), 0.0689110 secs] [Times: user=0.36 sys=0.05, real=0.07 secs] 32.440: [GC 32.441: [ParNew: 292606K-14985K(309056K), 0.0206040 secs] 298376K-20755K(995776K), 0.0208320 secs] [Times: user=0.20 sys=0.00, real=0.02 secs] 42.626: [GC 42.626: [ParNew: 289737K-15467K(309056K), 0.0138100 secs] 295507K-21237K(995776K), 0.0139830 secs] [Times: user=0.10 sys=0.00, real=0.01 secs] 56.633: [GC 56.633: [ParNew: 290219K-17334K(309056K), 0.0170930 secs] 295989K-23105K(995776K), 0.0173130 secs] [Times: user=0.12 sys=0.01, real=0.02 secs] Can anyone help me to understand these messgaes related to GC ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7384.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Problem in Spark Streaming
http://stackoverflow.com/questions/895444/java-garbage-collection-log-messages http://stackoverflow.com/questions/16794783/how-to-read-a-verbosegc-output I think this will help in understanding the logs. On Wed, Jun 11, 2014 at 12:53 PM, nilmish nilmish@gmail.com wrote: I used these commands to show the GC timings : -verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps Following is the output I got on the standard output : 4.092: [GC 4.092: [ParNew: 274752K-27199K(309056K), 0.0421460 secs] 274752K-27199K(995776K), 0.0422720 secs] [Times: user=0.33 sys=0.11, real=0.04 secs] 16.630: [GC 16.630: [ParNew: 301951K-17854K(309056K), 0.0686940 secs] 301951K-23624K(995776K), 0.0689110 secs] [Times: user=0.36 sys=0.05, real=0.07 secs] 32.440: [GC 32.441: [ParNew: 292606K-14985K(309056K), 0.0206040 secs] 298376K-20755K(995776K), 0.0208320 secs] [Times: user=0.20 sys=0.00, real=0.02 secs] 42.626: [GC 42.626: [ParNew: 289737K-15467K(309056K), 0.0138100 secs] 295507K-21237K(995776K), 0.0139830 secs] [Times: user=0.10 sys=0.00, real=0.01 secs] 56.633: [GC 56.633: [ParNew: 290219K-17334K(309056K), 0.0170930 secs] 295989K-23105K(995776K), 0.0173130 secs] [Times: user=0.12 sys=0.01, real=0.02 secs] Can anyone help me to understand these messgaes related to GC ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7384.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Number of Spark streams in Yarn cluster
Hi, I am trying to get a sense of number of streams we can process in parallel on a Spark streaming cluster(Hadoop Yarn). Is there any benchmark for the same? We need a large number of streams(original + transformed) to be processed in parallel. The number is approximately around= 30,. thanks Negi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-Spark-streams-in-Yarn-cluster-tp7386.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver
Thanks Tobias for replying. The problem was that, I have to provide the dependency jars' paths to the StreamingContext within the code. So, providing all the jar paths, resolved my problem. Refer the below code snippet: *JavaStreamingContext ssc = new JavaStreamingContext(args[0], SparkStreamExample, new Duration(1000), System.getenv(SPARK_HOME), new String[] {JavaStreamingContext.jarOfClass(SparkStreamExample.class)[0], /usr/local/spark-0.9.1-bin-hadoop2/external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar, /usr/lib/hbase/lib/zookeeper.jar, /usr/local/kafka/kafka_2.10-0.8.1.1/libs/zkclient-0.3.jar, /usr/local/kafka/kafka_2.10-0.8.1.1/libs/kafka_2.10-0.8.1.1.jar, /usr/local/scala/lib/scala-library.jar, /usr/local/shark-0.9.1-bin-hadoop2/lib_managed/jars/com.yammer.metrics/metrics-core/metrics-core-2.1.2.jar, /usr/local/hbase.jar});* The question is that isn't there any other way of doing this? The above approach doesn't seem good to me. For example, what if I execute the application on some other cluster where dependency paths are different? It is also not feasible to parametrize these jar paths as user arguments. Any advise will be appreciated. Regards, Gaurav On Mon, Jun 9, 2014 at 6:23 AM, Tobias Pfeiffer [via Apache Spark User List] ml-node+s1001560n7216...@n3.nabble.com wrote: Gaurav, I am not sure that the * expands to what you expect it to do. Normally the bash expands * to a space-separated string, not colon-separated. Try specifying all the jars manually, maybe? Tobias On Thu, Jun 5, 2014 at 6:45 PM, Gaurav Dasgupta [hidden email] http://user/SendEmail.jtp?type=nodenode=7216i=0 wrote: Hi, I have written my own custom Spark streaming code which connects to Kafka server and fetch data. I have tested the code on local mode and it is working fine. But when I am executing the same code on YARN mode, I am getting KafkaReceiver class not found exception. I am providing the Spark Kafka jar in the classpath and ensured that the path is correct for all the nodes in my cluster. I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10 node cluster) in the YARN cluster. I am using the following command to run my code on YARN mode: SPARK_YARN_MODE=true SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/ SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic NewTestTable 1 Below is the error message I am getting: 14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set 2.0 with 1 tasks 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 70 on executor 2: manny6.musigma.com (PROCESS_LOCAL) 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 2971 bytes in 2 ms 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0) 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:247) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479) at
Re: HDFS Server/Client IPC version mismatch while trying to access HDFS files using Spark-0.9.1
Any suggestions from anyone? Thanks Bijoy On Tue, Jun 10, 2014 at 11:46 PM, bijoy deb bijoy.comput...@gmail.com wrote: Hi all, I have build Shark-0.9.1 using sbt using the below command: *SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.6.0 sbt/sbt assembly* My Hadoop cluster is also having version 2.0.0-mr1-cdh4.6.0. But when I try to execute the below command from Spark shell,which reads a file from HDFS, I get the IPC version mismatch- IPC version 7 on server versus IPC version 4 on client error on org.apache.hadoop.hdfs.DFSClient class. *scala val s = sc.textFile(hdfs://host:port/test.txt)scala s.count()14/06/10 23:42:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/10 23:42:59 WARN snappy.LoadSnappy: Snappy native library not loadedorg.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)at com.sun.proxy.$Proxy9.getProtocolVersion(Unknown Source)at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)* at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) Apparently this error is because of version mismatch of the hadoop-hdfs jar between client (one referred by Spark) and server(hadoop cluster).But what I don't understand is why is this mismatch (since I had built Spark with the correct Hadoop version). Any suggestions would be highly appreciated. Thanks Bijoy
Re: Hanging Spark jobs
These stack traces come from the stuck node? Looks like it's waiting on data in BlockFetcherIterator. Waiting for data from another node. But you say all other nodes were done? Very curious. Maybe you could try turning on debug logging, and try to figure out what happens in BlockFetcherIterator ( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala). I do not think it is supposed to get stuck indefinitely. On Tue, Jun 10, 2014 at 8:22 PM, Hurwitz, Daniel dhurw...@ebay.com wrote: Hi, We are observing a recurring issue where our Spark jobs are hanging for several hours, even days, until we kill them. We are running Spark v0.9.1 over YARN. Our input is a list of edges of a graph on which we use Bagel to compute connected components using the following method: *class* CCMessage(*var* targetId: Long, *var* myComponentId: Long) *extends* Message[Long] *with* Serializable *def* compute(self: CC, msgs: Option[Array[CCMessage]], superstep: Int): (CC, Array[CCMessage]) = { *val* smallestComponentId = msgs.map(sq = *sq.map(_.* *myComponentId**)*.min).getOrElse(Long.MaxValue) *val* newComponentId = math.min(self.clusterID, smallestComponentId) *val* halt = (newComponentId == self.clusterID) || (superstep = maxIters) self.active = *if* (superstep == 0) *true* *else* !halt *val* outGoingMessages = *if* (halt superstep 0) Array[CCMessage]() *else* self.edges.map(targetId = *new* CCMessage(targetId, newComponentId)).toArray self.clusterID = newComponentId (self, outGoingMessages) } Our output is a text file in which each line is a list of the node IDs in each component. The size of the output may be up to 6 GB. We see in the job tracker that most of the time jobs usually get stuck on the “saveAsTextFile” command, the final line in our code. In some cases, the job will hang during one of the iterations of Bagel during the computation of the connected components. Oftentimes, when we kill the job and re-execute it, it will finish successfully within an hour which is the expected duration. We notice that if our Spark jobs don’t finish after a few hours, they will never finish until they are killed, regardless of the load on our cluster. After consulting with our Hadoop support team, they noticed that after a particular hanging Spark job was running for 38 hours, all Spark processes on all nodes were completed except for one node which was running more than 9 hours consuming very little CPU, then suddenly consuming 14s of CPU, then back to calm. Also, the other nodes were not relinquishing their resources until our Hadoop admin killed the process on that problematic node and suddenly the job finished and “success” was reported in the job tracker. The output seemed to be fine too. If it helps you understand the issue, the Hadoop admin suggested this was a Spark issue and sent us two stack dumps which I attached to this email: before killing the node’s Spark process (dump1.txt) and after (dump2.txt). Any advice on how to resolve this issue? How can we debug this? Thanks, ~Daniel
Spark SQL incorrect result on GROUP BY query
Hi, I am using spark 1.0.0 and found in spark sql some queries use GROUP BY give weird results. To reproduce, type the following commands in spark-shell connecting to a standalone server: case class Foo(k: String, v: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++ List.fill(300)(Foo(c, 3)) sc.makeRDD(rows).registerAsTable(foo) sql(select k,count(*) from foo group by k).collect the result will be something random like: res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75], [c,270], [4,56], [1,1]) and if I run the same query again, the new result will be correct: sql(select k,count(*) from foo group by k).collect res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300]) Should I file a bug? -- Pei-Lun Lee
Normalizations in MLBase
Hi All, I have to normalize a set of values in the range 0-500 to the [0-1] range. Is there any util method in MLBase to normalize large set of data? BR, Aslan
RE: Spark SQL incorrect result on GROUP BY query
That’s a good catch, but I think it’s suggested to use HiveContext currently. ( https://github.com/apache/spark/tree/master/sql) Catalyst$ sbt/sbt hive/console case class Foo(k: String, v: Int) val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++ List.fill(300)(Foo(c, 3)) sparkContext.makeRDD(rows).registerAsTable(foo) sql(select k,count(*) from foo group by k).collect res1: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300]) Cheng Hao From: Pei-Lun Lee [mailto:pl...@appier.com] Sent: Wednesday, June 11, 2014 6:01 PM To: user@spark.apache.org Subject: Spark SQL incorrect result on GROUP BY query Hi, I am using spark 1.0.0 and found in spark sql some queries use GROUP BY give weird results. To reproduce, type the following commands in spark-shell connecting to a standalone server: case class Foo(k: String, v: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++ List.fill(300)(Foo(c, 3)) sc.makeRDD(rows).registerAsTable(foo) sql(select k,count(*) from foo group by k).collect the result will be something random like: res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75], [c,270], [4,56], [1,1]) and if I run the same query again, the new result will be correct: sql(select k,count(*) from foo group by k).collect res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300]) Should I file a bug? -- Pei-Lun Lee
Re: Information on Spark UI
About more succeeded tasks than total tasks: - This can happen if you have enabled speculative execution. Some partitions can get processed multiple times. - More commonly, the result of the stage may be used in a later calculation, and has to be recalculated. This happens if some of the results were evicted from cache. On Wed, Jun 11, 2014 at 2:23 AM, Shuo Xiang shuoxiang...@gmail.com wrote: Hi, Came up with some confusion regarding the information on SparkUI. The following info is gathered while factorizing a large matrix using ALS: 1. some stages have more succeeded tasks than total tasks, which are displayed in the 5th column. 2. duplicate stages with exactly same stageID (stage 1/3/7) 3. Clicking into some stages, some executors cannot be addressed. Does that mean lost of executor or this does not matter? Any explanation are appreciated!
Re: Error During ReceivingConnection
It looks like this was due to another executor on a different node closing the connection on its side. I found the entries below in the remote side's logs. Can anyone comment on why one ConnectionManager would close its connection to another node and what could be tuned to avoid this? It did not have any errors on its side. This is from the ConnectionManager on the side shutting down the connection, not the ConnectionManager that had the Connection Reset By Peer. 14/06/10 18:51:14 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.125,45610) 14/06/10 18:51:14 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(172.16.25.125,45610) On Wed, Jun 11, 2014 at 8:38 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: I have a somewhat large job (10 GB input data but generates about 500 GB of data after many stages). Most tasks completed but a few stragglers on the same node/executor are still active (but doing nothing) after about 16 hours. At about 3 to 4 hours in, the tasks that are hanging have the following in the work logs. Any idea what config to tweak for this? 14/06/10 18:51:10 WARN network.ReceivingConnection: Error reading from connection to ConnectionManagerId(172.16.25.108,37693) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) at sun.nio.ch.IOUtil.read(IOUtil.java:224) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) at org.apache.spark.network.ReceivingConnection.read(Connection.scala:534) at org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) 14/06/10 18:51:10 INFO network.ConnectionManager: Handling connection error on connection to ConnectionManagerId(172.16.25.108,37693) 14/06/10 18:51:10 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.108,37693) 14/06/10 18:51:10 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(172.16.25.108,37693) 14/06/10 18:51:10 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.108,37693) 14/06/10 18:51:10 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/06/10 18:51:10 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.108,37693) 14/06/10 18:51:10 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/06/10 18:51:14 WARN network.ReceivingConnection: Error reading from connection to ConnectionManagerId(172.16.25.97,54918) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) at sun.nio.ch.IOUtil.read(IOUtil.java:224) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) at org.apache.spark.network.ReceivingConnection.read(Connection.scala:534) at org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) 14/06/10 18:51:14 INFO network.ConnectionManager: Handling connection error on connection to ConnectionManagerId(172.16.25.97,54918) 14/06/10 18:51:14 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.97,54918) 14/06/10 18:51:14 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(172.16.25.97,54918) 14/06/10 18:51:14 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.97,54918) 14/06/10 18:51:14 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/06/10 18:51:14 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.97,54918) 14/06/10 18:51:14 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v
Re: Information on Spark UI
Daniel, Thanks for the explanation. On Wed, Jun 11, 2014 at 8:57 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: About more succeeded tasks than total tasks: - This can happen if you have enabled speculative execution. Some partitions can get processed multiple times. - More commonly, the result of the stage may be used in a later calculation, and has to be recalculated. This happens if some of the results were evicted from cache. On Wed, Jun 11, 2014 at 2:23 AM, Shuo Xiang shuoxiang...@gmail.com wrote: Hi, Came up with some confusion regarding the information on SparkUI. The following info is gathered while factorizing a large matrix using ALS: 1. some stages have more succeeded tasks than total tasks, which are displayed in the 5th column. 2. duplicate stages with exactly same stageID (stage 1/3/7) 3. Clicking into some stages, some executors cannot be addressed. Does that mean lost of executor or this does not matter? Any explanation are appreciated!
Re: Spark SQL incorrect result on GROUP BY query
I'd try rerunning with master. It is likely you are running into SPARK-1994 https://issues.apache.org/jira/browse/SPARK-1994. Michael On Wed, Jun 11, 2014 at 3:01 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark 1.0.0 and found in spark sql some queries use GROUP BY give weird results. To reproduce, type the following commands in spark-shell connecting to a standalone server: case class Foo(k: String, v: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++ List.fill(300)(Foo(c, 3)) sc.makeRDD(rows).registerAsTable(foo) sql(select k,count(*) from foo group by k).collect the result will be something random like: res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75], [c,270], [4,56], [1,1]) and if I run the same query again, the new result will be correct: sql(select k,count(*) from foo group by k).collect res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300]) Should I file a bug? -- Pei-Lun Lee
Adding external jar to spark-shell classpath in spark 1.0
Hi, I am currently using spark 1.0 locally on Windows 7. I would like to use classes from external jar in the spark-shell. I followed the instruction in: http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCALrNVjWWF6k=c_jrhoe9w_qaacjld4+kbduhfv0pitr8h1f...@mail.gmail.com%3E I have set ADD_JARS=my.jar SPARK_CLASSPATH=my.jar in spark-shell.cmd but this didn't work. I also tried running spark-shell.cmd --jars my.jar --driver-class-path my.jar --driver-library-path my.jar and it didn't work either. I cannot load any class from my jar into spark shell. Btw my.jar contains a simple Scala class. Best regards, Alexander
Re: Adding external jar to spark-shell classpath in spark 1.0
Ah, not that it should matter, but I'm on Linux and you seem to be on Windows... maybe there is something weird going on with the Windows launcher? On Wed, Jun 11, 2014 at 10:34 AM, Marcelo Vanzin van...@cloudera.com wrote: Just tried this and it worked fine for me: ./bin/spark-shell --jars jar1,jar2,etc,etc On Wed, Jun 11, 2014 at 10:25 AM, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi, I am currently using spark 1.0 locally on Windows 7. I would like to use classes from external jar in the spark-shell. I followed the instruction in: http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCALrNVjWWF6k=c_jrhoe9w_qaacjld4+kbduhfv0pitr8h1f...@mail.gmail.com%3E I have set ADD_JARS=”my.jar” SPARK_CLASSPATH=”my.jar” in spark-shell.cmd but this didn’t work. I also tried running “spark-shell.cmd --jars my.jar --driver-class-path my.jar --driver-library-path my.jar” and it didn’t work either. I cannot load any class from my jar into spark shell. Btw my.jar contains a simple Scala class. Best regards, Alexander -- Marcelo -- Marcelo
Re: pmml with augustus
Hello Spark/PMML enthusiasts, It's pretty trivial to integrate the JPMML-Evaluator library with Spark. In brief, take the following steps in your Spark application code: 1) Create a Java Map (arguments) that represents the input data record. You need to specify a key-value mapping for every active MiningField. The key type is org.jpmml.evaluator.FieldName. The value type could be String or any Java primitive data type that can be converted to the requested PMML type. 2) Obtain an instance of org.jpmml.evaluator.Evaluator. Invoke its #evaluate(MapFieldName, ?) method using the argument map created in step 1. 3) Process the Java Map (results) that represents the output data record. Putting it all together: JavaRDDMaplt;FieldName, String arguments = ... final ModelEvaluator? modelEvaluator = (ModelEvaluator?)pmmlManager.getModelManager(null, ModelEvaluatorFactory.getInstance()); // See the JPMML-Evaluator documentation JavaRDDMaplt;FieldName, ? results = arguments.flatMap(new FlatMapFunctionMaplt;FieldName, String, MapFieldName, ?(){ @Override public IterableMaplt;FieldName, ? call(MapFieldName, String arguments){ MapFieldName, ? result = modelEvaluator.evaluate(arguments); return Collections.Maplt;FieldName, ?singletonList(result); } }); Of course, it's not very elegant to be using JavaRDDMaplt;K, V here. Maybe someone can give me a hint about making it look and feel more Spark-y? Also, I would like to refute earlier comment by @pacoid, that JPMML-evaluator compares poorly against Augustus and Zementis products. First, JPMML-Evaluator fully supports PMML specification versions 3.0 through 4.2. I would specifically stress the support for PMML 4.2, which was released just a few months ago. Second, JPMML is open source. Perhaps its licensing terms could be more liberal, but it's nevertheless the most open and approachable way of bringing Java and PMML together. VR -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313p7412.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Adding external jar to spark-shell classpath in spark 1.0
Are you able to import any class from you jars within spark-shell? -Original Message- From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: Wednesday, June 11, 2014 9:36 PM To: user@spark.apache.org Subject: Re: Adding external jar to spark-shell classpath in spark 1.0 Ah, not that it should matter, but I'm on Linux and you seem to be on Windows... maybe there is something weird going on with the Windows launcher? On Wed, Jun 11, 2014 at 10:34 AM, Marcelo Vanzin van...@cloudera.com wrote: Just tried this and it worked fine for me: ./bin/spark-shell --jars jar1,jar2,etc,etc On Wed, Jun 11, 2014 at 10:25 AM, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi, I am currently using spark 1.0 locally on Windows 7. I would like to use classes from external jar in the spark-shell. I followed the instruction in: http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCA LrNVjWWF6k=c_jrhoe9w_qaacjld4+kbduhfv0pitr8h1f...@mail.gmail.com%3E I have set ADD_JARS=”my.jar” SPARK_CLASSPATH=”my.jar” in spark-shell.cmd but this didn’t work. I also tried running “spark-shell.cmd --jars my.jar --driver-class-path my.jar --driver-library-path my.jar” and it didn’t work either. I cannot load any class from my jar into spark shell. Btw my.jar contains a simple Scala class. Best regards, Alexander -- Marcelo -- Marcelo
RE: Adding external jar to spark-shell classpath in spark 1.0
Could you elaborate on this? I don’t have an application, I just use spark shell. From: Andrew Or [mailto:and...@databricks.com] Sent: Wednesday, June 11, 2014 9:40 PM To: user@spark.apache.org Subject: Re: Adding external jar to spark-shell classpath in spark 1.0 This is a known issue: https://issues.apache.org/jira/browse/SPARK-1919. We haven't found a fix yet, but for now, you can workaround this by including your simple class in your application jar. 2014-06-11 10:25 GMT-07:00 Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com: Hi, I am currently using spark 1.0 locally on Windows 7. I would like to use classes from external jar in the spark-shell. I followed the instruction in: http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCALrNVjWWF6k=c_jrhoe9w_qaacjld4+kbduhfv0pitr8h1f...@mail.gmail.com%3E I have set ADD_JARS=”my.jar” SPARK_CLASSPATH=”my.jar” in spark-shell.cmd but this didn’t work. I also tried running “spark-shell.cmd --jars my.jar --driver-class-path my.jar --driver-library-path my.jar” and it didn’t work either. I cannot load any class from my jar into spark shell. Btw my.jar contains a simple Scala class. Best regards, Alexander
Re: HDFS Server/Client IPC version mismatch while trying to access HDFS files using Spark-0.9.1
The error is saying that your client libraries are older than what your server is using (2.0.0-mr1-cdh4.6.0 is IPC version 7). Try double-checking that your build is actually using that version (e.g., by looking at the hadoop jar files in lib_managed/jars). On Wed, Jun 11, 2014 at 2:07 AM, bijoy deb bijoy.comput...@gmail.com wrote: Any suggestions from anyone? Thanks Bijoy On Tue, Jun 10, 2014 at 11:46 PM, bijoy deb bijoy.comput...@gmail.com wrote: Hi all, I have build Shark-0.9.1 using sbt using the below command: SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.6.0 sbt/sbt assembly My Hadoop cluster is also having version 2.0.0-mr1-cdh4.6.0. But when I try to execute the below command from Spark shell,which reads a file from HDFS, I get the IPC version mismatch- IPC version 7 on server versus IPC version 4 on client error on org.apache.hadoop.hdfs.DFSClient class. scala val s = sc.textFile(hdfs://host:port/test.txt) scala s.count() 14/06/10 23:42:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/10 23:42:59 WARN snappy.LoadSnappy: Snappy native library not loaded org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy9.getProtocolVersion(Unknown Source) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379) at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) Apparently this error is because of version mismatch of the hadoop-hdfs jar between client (one referred by Spark) and server(hadoop cluster).But what I don't understand is why is this mismatch (since I had built Spark with the correct Hadoop version). Any suggestions would be highly appreciated. Thanks Bijoy -- Marcelo
Re: Information on Spark UI
Using MEMORY_AND_DISK_SER to persist the input RDD[Rating] seems to work right for me now. I'm testing on a larger dataset and will see how it goes. On Wed, Jun 11, 2014 at 9:56 AM, Neville Li neville@gmail.com wrote: Does cache eviction affect disk storage level too? I tried cranking up replication but still seeing this. On Wednesday, June 11, 2014, Shuo Xiang shuoxiang...@gmail.com wrote: Daniel, Thanks for the explanation. On Wed, Jun 11, 2014 at 8:57 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: About more succeeded tasks than total tasks: - This can happen if you have enabled speculative execution. Some partitions can get processed multiple times. - More commonly, the result of the stage may be used in a later calculation, and has to be recalculated. This happens if some of the results were evicted from cache. On Wed, Jun 11, 2014 at 2:23 AM, Shuo Xiang shuoxiang...@gmail.com wrote: Hi, Came up with some confusion regarding the information on SparkUI. The following info is gathered while factorizing a large matrix using ALS: 1. some stages have more succeeded tasks than total tasks, which are displayed in the 5th column. 2. duplicate stages with exactly same stageID (stage 1/3/7) 3. Clicking into some stages, some executors cannot be addressed. Does that mean lost of executor or this does not matter? Any explanation are appreciated!
Powered by Spark addition
Hello, I was wondering if we could add our organization to the Powered by Spark page. The information is: Name: Vistar Media URL: www.vistarmedia.com Description: Location technology company enabling brands to reach on-the-go consumers. Let me know if you need anything else. Thanks! Derek Mansen
Re: Having trouble with streaming (updateStateByKey)
I rearranged my code to do a reduceByKey which I think is working. I also don't think the problem was that updateState call, but something else; unfortunately I changed a lot in looking for this issue, so not sure what the actual fix might have been, but I think it's working now. On Wed, Jun 11, 2014 at 1:47 PM, Michael Campbell michael.campb...@gmail.com wrote: I'm having a little trouble getting an updateStateByKey() call to work; was wondering if anyone could help. In my chain of calls from getting Kafka messages out of the queue to converting the message to a set of things, then pulling out 2 attributes of those things to a Tuple2, everything works. So what I end up with is about a 1 second dump of things like this (this is crufted up data, but it's basically 2 IPV6 addresses...) --- Time: 1402507839000 ms --- (:::a14:b03,:::a0a:2bd4) (:::a14:b03,:::a0a:2bd4) (:::a0a:25a7,:::a14:b03) (:::a14:b03,:::a0a:2483) (:::a14:b03,:::a0a:2480) (:::a0a:2d96,:::a14:b03) (:::a0a:abb5,:::a14:100) (:::a0a:dcd7,:::a14:28) (:::a14:28,:::a0a:da94) (:::a14:b03,:::a0a:2def) ... This works ok. The problem is when I add a call to updateStateByKey - the streaming app runs and runs and runs and never outputs anything. When I debug, I can't confirm that my state update passed-in function is ever actually getting called. Indeed I have breakpoints and println statements in my updateFunc and it LOOKS like it's never getting called. I can confirm that the updateStateByKey function IS getting called (via it stopping on a breakpoint). Is there something obvious I'm missing?
Kafka client - specify offsets?
Is there a way in the Apache Spark Kafka Utils to specify an offset to start reading? Specifically, from the start of the queue, or failing that, a specific point?
Re: Normalizations in MLBase
Hi Aslan, Currently, we don't have the utility function to do so. However, you can easily implement this by another map transformation. I'm working on this feature now, and there will be couple different available normalization option users can chose. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Jun 11, 2014 at 6:25 AM, Aslan Bekirov aslanbeki...@gmail.com wrote: Hi All, I have to normalize a set of values in the range 0-500 to the [0-1] range. Is there any util method in MLBase to normalize large set of data? BR, Aslan
Re: json parsing with json4s
Hello, You're absolutely right, the syntax you're using is returning the json4s value objects, not native types like Int, Long etc. fix that problem and then everything else (filters) will work as you expect. This is a short snippet of a larger example: [1] val lines = sc.textFile(likes.json) val user_interest = lines.map(line = { // Parse the JSON, returns RDD[JValue] parse(line) }).map(json = { // Extract the values we need to populate the UserInterest class implicit lazy val formats = org.json4s.DefaultFormats val name = (json \ name).extract[String] val location_x = (json \ location \ x).extract[Double] val location_y = (json \ location \ y).extract[Double] val likes = (json \ likes).extract[Seq[String]].map(_.toLowerCase()).mkString(;) ( UserInterest(name, location_x, location_y, likes) ) }) The key parts are implicit lazy val formats = org.json4s.DefaultFormats being defined before you mess with the JSON and (json \ location \ x). extract[Double] to extract the parts you need. One thing to be wary of is if you're JSON is not consistent, i.e. fields not always being set -- then using the extract[Double] method will raise exceptions. Then you may wish to use an alternate way to pull out the values as a String and process them yourself. e.g. val id = compact(render(json \ facebook \ id)) Good luck playing with JSON and Spark! :o) Best, MC [1] UserInterestsExample.scala https://gist.github.com/cotdp/b471cfff183b59d65ae1 On 11 June 2014 23:26, SK skrishna...@gmail.com wrote: I have the following piece of code that parses a json file and extracts the age and TypeID val p = sc.textFile(log_file) .map(line = { parse(line) }) .map(json = { val v1 = json \ person \ age val v2 = json \ Action \ Content \ TypeID (v1, v2) } ) p.foreach(r = println(r)) The result is: (JInt(12),JInt(5)) (JInt(32),JInt(6)) (JInt(40),JInt(7)) 1) How can I extract the values (i.e. without the JInt) ? I tried returning (v1.toInt, v2.toInt) from the map but got a compilation error stating that toInt is not a valid operation. 2) I would also like to know how I can filter the above tuples based on the age values. For e.g. I added the following after the second map operation: p.filter(tup = tup._1 20) I got a compilation errror: value is not a member of org.json4s.JValue Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/json-parsing-with-json4s-tp7430.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Powered by Spark addition
Alright, added you. Matei On Jun 11, 2014, at 1:28 PM, Derek Mansen de...@vistarmedia.com wrote: Hello, I was wondering if we could add our organization to the Powered by Spark page. The information is: Name: Vistar Media URL: www.vistarmedia.com Description: Location technology company enabling brands to reach on-the-go consumers. Let me know if you need anything else. Thanks! Derek Mansen
Re: Not fully cached when there is enough memory
Could you try to click one that RDD and see the storage info per partition? I tried continuously caching RDDs, so new ones kick old ones out when there is not enough memory. I saw similar glitches but the storage info per partition is correct. If you find a way to reproduce this error, please create a JIRA. Thanks! -Xiangrui
Re: Using Spark on Data size larger than Memory size
Thanks. We've run into timeout issues at scale as well. We were able to workaround them by setting the following JVM options: -Dspark.akka.askTimeout=300 -Dspark.akka.timeout=300 -Dspark.worker.timeout=300 NOTE: these JVM options *must* be set on worker nodes (and not just the driver/master) for the settings to take. Allen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-on-Data-size-larger-than-Memory-size-tp6589p7435.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: problem starting the history server on EC2
I tried everything including sudo, but it still did not work using the local directory. However, I finally got it working by getting the history server to log into hdfs. I first created a directory in hdfs like the following: ./ephemeral-hdfs/bin/hadoop fs -mkdir /spark_logs Then I started the started the history server like the following. ./start-history-server.sh hdfs:///spark_logs --port 18080 In order to see the history server UI I needed to open up inbound traffic for the port 18080 in AWS. As follows custom TCP port 18080 from anywhere Hope this will help others. Zhen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361p7436.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using Spark to crack passwords
What about rainbow tables? http://en.wikipedia.org/wiki/Rainbow_table M. 2014-06-12 2:41 GMT+02:00 DB Tsai dbt...@stanford.edu: I think creating the samples in the search space within RDD will be too expensive, and the amount of data will probably be larger than any cluster. However, you could create a RDD of searching ranges, and each range will be searched by one map operation. As a result, in this design, the # of row in RDD will be the same as the # of executors, and we can use mapPartition to loop through all the sample in the range without actually storing them in RDD. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Jun 11, 2014 at 5:24 PM, Nick Chammas nicholas.cham...@gmail.com wrote: Spark is obviously well-suited to crunching massive amounts of data. How about to crunch massive amounts of numbers? A few years ago I put together a little demo for some co-workers to demonstrate the dangers of using SHA1 to hash and store passwords. Part of the demo included a live brute-forcing of hashes to show how SHA1's speed made it unsuitable for hashing passwords. I think it would be cool to redo the demo, but utilize the power of a cluster managed by Spark to crunch through hashes even faster. But how would you do that with Spark (if at all)? I'm guessing you would create an RDD that somehow defined the search space you're going to go through, and then partition it to divide the work up equally amongst the cluster's cores. Does that sound right? I wonder if others have already used Spark for computationally-intensive workloads like this, as opposed to just data-intensive ones. Nick View this message in context: Using Spark to crack passwords Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Not fully cached when there is enough memory
Xiangrui, clicking into the RDD link, it gives the same message, say only 96 of 100 partitions are cached. The disk/memory usage are the same, which is far below the limit. Is this what you want to check or other issue? On Wed, Jun 11, 2014 at 4:38 PM, Xiangrui Meng men...@gmail.com wrote: Could you try to click one that RDD and see the storage info per partition? I tried continuously caching RDDs, so new ones kick old ones out when there is not enough memory. I saw similar glitches but the storage info per partition is correct. If you find a way to reproduce this error, please create a JIRA. Thanks! -Xiangrui
Re: When to use CombineByKey vs reduceByKey?
combineByKey is designed for when your return type from the aggregation is different from the values being aggregated (e.g. you group together objects), and it should allow you to modify the leftmost argument of each function (mergeCombiners, mergeValue, etc) and return that instead of allocating a new object. So it should work with mutable objects — please post what problems you had with that. reduceByKey actually also allows this if your types are the same. Matei On Jun 11, 2014, at 3:21 PM, Diana Hu siyin...@gmail.com wrote: Hello all, I've seen some performance improvements using combineByKey as opposed to reduceByKey or a groupByKey+map function. I have a couple questions. it'd be great if any one can provide some light into this. 1) When should I use combineByKey vs reduceByKey? 2) Do the containers need to be immutable for combineByKey? I've created custom data structures for the containers, one mutable and one immutable. The tests with the mutable containers, spark crashed with an error on missing references. However the downside of immutable containers (which works on my tests), is that for large datasets the garbage collector gets called many more times, and it tends to run out of heap space as the GC can't catch up. I tried some of the tips here http://spark.apache.org/docs/latest/tuning.html#memory-tuning and tuning the JVM params, but this seems to be too much tuning? Thanks in advance, - Diana
Re: Using Spark to crack passwords
Yes, I mean the RDD would just have elements to define partitions or ranges within the search space, not have actual hashes. It's really just a using the RDD as a control structure, rather than a real data set. As you noted, we don't need to store any hashes. We just need to check them as they are computed against our target hash. 2014년 6월 11일 수요일, DB Tsaidbt...@stanford.edu님이 작성한 메시지: I think creating the samples in the search space within RDD will be too expensive, and the amount of data will probably be larger than any cluster. However, you could create a RDD of searching ranges, and each range will be searched by one map operation. As a result, in this design, the # of row in RDD will be the same as the # of executors, and we can use mapPartition to loop through all the sample in the range without actually storing them in RDD. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Jun 11, 2014 at 5:24 PM, Nick Chammas nicholas.cham...@gmail.com javascript:; wrote: Spark is obviously well-suited to crunching massive amounts of data. How about to crunch massive amounts of numbers? A few years ago I put together a little demo for some co-workers to demonstrate the dangers of using SHA1 to hash and store passwords. Part of the demo included a live brute-forcing of hashes to show how SHA1's speed made it unsuitable for hashing passwords. I think it would be cool to redo the demo, but utilize the power of a cluster managed by Spark to crunch through hashes even faster. But how would you do that with Spark (if at all)? I'm guessing you would create an RDD that somehow defined the search space you're going to go through, and then partition it to divide the work up equally amongst the cluster's cores. Does that sound right? I wonder if others have already used Spark for computationally-intensive workloads like this, as opposed to just data-intensive ones. Nick View this message in context: Using Spark to crack passwords Sent from the Apache Spark User List mailing list archive at Nabble.com.
use spark-shell in the source
Title: Samsung Enterprise Portal mySingle Hi all, Can I use spark-shell programmatically in my spark application(in java or scala)? Because I want toconvert scalalines to string array and run automatically in my application. For example, for( var line - lines){ //run this line in spark shell style and get outputs. run(line); } Thanks _ JaeBoo, Jung AssistantEngineer/BDA Lab/ Samsung SDS