Re: How to set java.library.path in a spark cluster
Add something like following to spark-env.sh export LD_LIBRARY_PATH=:$LD_LIBRARY_PATH (and remove all 5 exports you listed). Then restart all worker nodes, and try again. Good luck! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-java-library-path-in-a-spark-cluster-tp13854p13857.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Records - Input Byte
What do you mean by "control your input”, are you trying to pace your spark streaming by number of words. If so that is not supported as of now, you can only control time & consume all files within that time period. -- Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Tue, Sep 9, 2014 at 2:24 AM, danilopds wrote: > Hi, > I was reading the paper of Spark Streaming: > "Discretized Streams: Fault-Tolerant Streaming Computation at Scale" > So, > I read that performance evaluation used 100-byte input records in test Grep > and WordCount. > I don't have much experience and I'd like to know how can I control this > value in my records (like words in an input file)? > Can anyone suggest me something to start? > Thanks! > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Records-Input-Byte-tp13733.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org
Re: Table not found: using jdbc console to query sparksql hive thriftserver
You need to run mvn install so that the package you built is put into the local maven repo. Then when compiling your own app (with the right dependency specified), the package will be retrieved. On 9/9/14, 8:16 PM, "alexandria1101" wrote: >I think the package does not exist because I need to change the pom file: > > org.apache.spark > spark-assembly_2.10 > 1.0.1 > pom > provided > > >I changed the version number to 1.1.1, yet still that causes the build >error: > >Failure to find org.apache.spark:spark-assembly_2.10:pom:1.1.1 in >http://repo.maven.apache.org/maven2 was cached in the local repository, >resolution will not be reattempted until the update interval of central >has >elapsed or updates are forced -> [Help 1] > >Is there a way to get past this? > > > >-- >View this message in context: >http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using- >jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13851.html >Sent from the Apache Spark User List mailing list archive at Nabble.com. > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to choose right DStream batch interval
Hi Mayur, Thanks for your response. I did write a simple test that set up a DStream with 5 batches; The batch duration is 1 second, and the 3rd batch will take extra 2 seconds, the output of the test shows that the 3rd batch causes backlog, and spark streaming does catch up on 4th and 5th batch (DStream.print was modified to output system time) --- Time: 1409959708000 ms, system time: 1409959708269 --- 1155 --- Time: 1409959709000 ms, system time: 1409959709033 --- 2255 delay 2000 ms --- Time: 140995971 ms, system time: 1409959712036 --- 3355 --- Time: 1409959711000 ms, system time: 1409959712059 --- 4455 --- Time: 1409959712000 ms, system time: 1409959712083 --- Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13855.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to set java.library.path in a spark cluster
Hi, I am working on a 3 machine cloudera cluster. Whenever I submit a spark job as a jar file with native dependency on mosek it shows the following error. java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path How should I set the java.library.path. I printed the environment variable and it shows; -Djava.library.path= -Xms512m -Xmx512m, I added the following lines to the spark-env.sh file but it was of no help. The path contains both the mosek.jar and the libmosek_7.0.so files. export SPARK_LIBRARY_PATH=${SPARK_HOME}/lib:/home/chanda/mosek/7/tools/platform/linux64x86/bin export SPARK_MASTER_OPTS='-Djava.library.path="/home/chanda/mosek/7/tools/platform/linux64x86/bin' export SPARK_WORKER_OPTS='/home/chanda/mosek/7/tools/platform/linux64x86/bin' export SPARK_HISTORY_OPTS='/home/chanda/mosek/7/tools/platform/linux64x86/bin' export SPARK_DAEMON_JAVA_OPTS='/home/chanda/mosek/7/tools/platform/linux64x86/bin' Please help -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-java-library-path-in-a-spark-cluster-tp13854.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to setup steady state stream partitions
Thanks for your response. I do have something like: val inputDStream = ... val keyedDStream = inputDStream.map(...) // use sensorId as key val partitionedDStream = keyedDstream.transform(rdd => rdd.partitionBy(new MyPartitioner(...))) val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction) The partitionedDStream does have steady partitions, but stateDStream does not have steady partitions, i.e., in the partition 0 of partitionedDStream, there's only data for sensors 0 to 999, but the partition 0 of stateDStream contains data for some sensors from 0 to 999 range, and lot of sensor from other partitions of partitionedDStream. I wish the partition 0 of stateDStream only contains the data from the partition 0 of partitionedDStream, partiton 1 of stateDStream only from partition 1 of partitionedDStream, and so on. Anyone knows how to implement that? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13853.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to setup steady state stream partitions
Using your own partitioner didn't work? e.g. YourRDD.partitionBy(new HashPartitioner(your number)) xj @ Tokyo On Wed, Sep 10, 2014 at 12:03 PM, qihong wrote: > I'm working on a DStream application. The input are sensors' measurements, > the data format is > > There are 10 thousands sensors, and updateStateByKey is used to maintain > the states of sensors, the code looks like following: > > val inputDStream = ... > val keyedDStream = inputDStream.map(...) // use sensorId as key > val stateDStream = keyedDStream.updateStateByKey[...](udpateFunction) > > Here's the question: > In a cluster with 10 worker nodes, is it possible to partition the input > dstream, so that node 1 handles sendor 0-999, node 2 handles 1000-1999, > and so on? > > Also, is it possible to keep state stream for sensor 0 - 999 on node 1, > 1000 > to 1999 on node 2, and etc. Right now, I see sensor state stream is > shuffled > for every batch, which used lot of network bandwidth and it's unnecessary. > > Any suggestions? > > Thanks! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Table not found: using jdbc console to query sparksql hive thriftserver
I think the package does not exist because I need to change the pom file: org.apache.spark spark-assembly_2.10 1.0.1 pom provided I changed the version number to 1.1.1, yet still that causes the build error: Failure to find org.apache.spark:spark-assembly_2.10:pom:1.1.1 in http://repo.maven.apache.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced -> [Help 1] Is there a way to get past this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using-jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to setup steady state stream partitions
I'm working on a DStream application. The input are sensors' measurements, the data format is There are 10 thousands sensors, and updateStateByKey is used to maintain the states of sensors, the code looks like following: val inputDStream = ... val keyedDStream = inputDStream.map(...) // use sensorId as key val stateDStream = keyedDStream.updateStateByKey[...](udpateFunction) Here's the question: In a cluster with 10 worker nodes, is it possible to partition the input dstream, so that node 1 handles sendor 0-999, node 2 handles 1000-1999, and so on? Also, is it possible to keep state stream for sensor 0 - 999 on node 1, 1000 to 1999 on node 2, and etc. Right now, I see sensor state stream is shuffled for every batch, which used lot of network bandwidth and it's unnecessary. Any suggestions? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming for synchronous API
Hi again, On Tue, Sep 9, 2014 at 2:20 PM, Tobias Pfeiffer wrote: > > On Tue, Sep 9, 2014 at 2:02 PM, Ron's Yahoo! wrote: >> >> For example, let’s say there’s a particular topic T1 in a Kafka queue. >> If I have a new set of requests coming from a particular client A, I was >> wondering if I could create a partition A. >> The streaming job is submitted to listen to T1.A and will write to a >> topic T2.A, which the REST endpoint would be listening on. >> > > That doesn't seem like a good way to use Kafka. It may be possible, but I > am pretty sure you should create a new topic T_A instead of a partition A > in an existing topic. With some modifications of Spark Streaming's > KafkaReceiver you *might* be able to get it to work as you imagine, but it > was not meant to be that way, I think. > Maybe I was wrong about a new topic being the better way. Looking, for example, at the way that Samza consumes Kafka streams < http://samza.incubator.apache.org/learn/documentation/latest/introduction/concepts.html>, it seems like there is one task per partition and data can go into partitions keyed by user ID. So maybe a new partition is actually the conceptually better way. Nonetheless, the built-in KafkaReceiver doesn't support assignment of partitions to receivers AFAIK ;-) Tobias
Re: Spark EC2 standalone - Utils.fetchFile no such file or directory
I've encountered probably the same problem and just figured out the solution. The error was caused because Spark tried to write to the scratch directory but the path didn't exist. It's likely you are running the app on the master node only. In the spark-ec2 setting, the scratch directory for Spark(spark.local.dir) is set to /mnt/spark in conf/spark-env.sh. This path exists on all slave nodes but not the master node, hence the error. So if you set the master URL to spark://your-master-node-domain:7077, the error will be gone since all the slave instances are in slave nodes. If you need to test on the master node, either create /mnt/spark your self or change the entry(SPARK_LOCAL_DIRS) in conf/spark-env.sh to some existing path with write permission. Note that the environment variables defined in conf/spark-env.sh are meant for machine-specific settings thus they will override the settings in the SparkConf object even if you provided one. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-EC2-standalone-Utils-fetchFile-no-such-file-or-directory-tp12683p13848.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Table not found: using jdbc console to query sparksql hive thriftserver
Thanks so much! That makes complete sense. However, when I compile I get an error "package org.apache.spark.sql.hive does not exist." Does anyone else have this and any idea why this might be so? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using-jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark.cleaner.ttl and spark.streaming.unpersist
Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] Sent: Wednesday, September 10, 2014 5:21 AM To: user@spark.apache.org Subject: spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Re: Table not found: using jdbc console to query sparksql hive thriftserver
Your tables were registered in the SqlContext, whereas the thrift server works with HiveContext. They seem to be in two different worlds today. On 9/9/14, 5:16 PM, "alexandria1101" wrote: >Hi, > >I want to use the sparksql thrift server in my application and make sure >everything is loading and working. I built Spark 1.1 SNAPSHOT and ran the >thrift server using ./sbin/start-thrift-server. In my application I load >tables into schemaRDDs and I expect that the thrift-server should pick >them >up. In the app I then perform SQL queries on a table called mutation >(the >same name as the table I registered from the schemaRDD). > >I set the driver to "org.apache.hive.jdbc.HiveDriver" and the url to >"jdbc:hive2://localhost:1/mutation?zeroDateTimeBehavior=convertToNull" >. > >When I check the terminal for the thrift server output, it gets the >query. >However, I cannot use a jdbc console to communicate with it to show all of >the databases and tables to see if mutation is loaded. > > >I get the following errors: > >14/09/09 16:51:02 WARN component.AbstractLifeCycle: FAILED >SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address >already >in use >java.net.BindException: Address already in use > at sun.nio.ch.Net.bind0(Native Method) > at sun.nio.ch.Net.bind(Net.java:444) > at sun.nio.ch.Net.bind(Net.java:436) > at >sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) > at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) > at >org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConn >ector.java:187) > at >org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java: >316) > at >org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelC >onnector.java:265) > at >org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle >.java:64) > at org.eclipse.jetty.server.Server.doStart(Server.java:293) > at >org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle >.java:64) > at >org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1( >JettyUtils.scala:192) > at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) > at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) > at >org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Ut >ils.scala:1446) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) > at > org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202) > at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) > at org.apache.spark.SparkContext.(SparkContext.scala:224) > at >org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:5 >3) > at com.illumina.phoenix.util.Runner.createSparkContext(Runner.java:144) > at >com.illumina.phoenix.etl.EtlPipelineRunner.main(EtlPipelineRunner.java:116 >) >1053 [main] WARN org.eclipse.jetty.util.component.AbstractLifeCycle - >FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: >Address >already in use >java.net.BindException: Address already in use > at sun.nio.ch.Net.bind0(Native Method) > at sun.nio.ch.Net.bind(Net.java:444) > at sun.nio.ch.Net.bind(Net.java:436) > at >sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) > at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) > at >org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConn >ector.java:187) > at >org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java: >316) > at >org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelC >onnector.java:265) > at >org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle >.java:64) > at org.eclipse.jetty.server.Server.doStart(Server.java:293) > at >org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle >.java:64) > at >org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1( >JettyUtils.scala:192) > at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) > at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) > at >org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Ut >ils.scala:1446) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) > at > org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202) > at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) > at org.apache.spark.SparkContext.(SparkContext.scala:224) > at >org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:5 >3) >
EOFException when reading from HDFS
I ran the SimpleApp program from spark tutorial (https://spark.apache.org/docs/1.0.0/quick-start.html), which works fine. However, if I change the file location from local to hdfs, then I get an EOFException. I did some search online which suggests this error is caused by hadoop version conflicts, I made the suggested modification in my sbt file, but still get the same error. libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.3.0-cdh5.1.0" I am using CDH5.1, full error message is below. Any help is greatly appreciated. Thanks [hdfs@plogs001 test1]$ spark-submit --class SimpleApp --master spark://172.16.30.164:7077 target/scala-2.10/simple-project_2.10-1.0.jar 14/09/09 16:56:41 INFO spark.SecurityManager: Changing view acls to: hdfs 14/09/09 16:56:41 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hdfs) 14/09/09 16:56:41 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/09 16:56:41 INFO Remoting: Starting remoting 14/09/09 16:56:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@plogs001.sjc.domain.com:34607] 14/09/09 16:56:41 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@plogs001.sjc.domain.com:34607] 14/09/09 16:56:41 INFO spark.SparkEnv: Registering MapOutputTracker 14/09/09 16:56:41 INFO spark.SparkEnv: Registering BlockManagerMaster 14/09/09 16:56:41 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140909165641-375e 14/09/09 16:56:41 INFO storage.MemoryStore: MemoryStore started with capacity 294.9 MB. 14/09/09 16:56:41 INFO network.ConnectionManager: Bound socket to port 40833 with id = ConnectionManagerId(plogs001.sjc.domain.com,40833) 14/09/09 16:56:41 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/09 16:56:41 INFO storage.BlockManagerInfo: Registering block manager plogs001.sjc.domain.com:40833 with 294.9 MB RAM 14/09/09 16:56:41 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/09 16:56:41 INFO spark.HttpServer: Starting HTTP Server 14/09/09 16:56:42 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/09 16:56:42 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:47419 14/09/09 16:56:42 INFO broadcast.HttpBroadcast: Broadcast server started at http://172.16.30.161:47419 14/09/09 16:56:42 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-7026d0b6-777e-4dd3-9bbb-e79d7487e7d7 14/09/09 16:56:42 INFO spark.HttpServer: Starting HTTP Server 14/09/09 16:56:42 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/09 16:56:42 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:42388 14/09/09 16:56:42 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/09 16:56:42 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/09/09 16:56:42 INFO ui.SparkUI: Started SparkUI at http://plogs001.sjc.domain.com:4040 14/09/09 16:56:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/09 16:56:42 INFO spark.SparkContext: Added JAR file:/home/hdfs/kent/test1/target/scala-2.10/simple-project_2.10-1.0.jar at http://172.16.30.161:42388/jars/simple-project_2.10-1.0.jar with timestamp 1410307002737 14/09/09 16:56:42 INFO client.AppClient$ClientActor: Connecting to master spark://plogs004.sjc.domain.com:7077... 14/09/09 16:56:42 INFO storage.MemoryStore: ensureFreeSpace(155704) called with curMem=0, maxMem=309225062 14/09/09 16:56:42 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 152.1 KB, free 294.8 MB) 14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140909165642-0041 14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor added: app-20140909165642-0041/0 on worker-20140902113555-plogs005.sjc.domain.com-7078 (plogs005.sjc.domain.com:7078) with 24 cores 14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140909165642-0041/0 on hostPort plogs005.sjc.domain.com:7078 with 24 cores, 1024.0 MB RAM 14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor added: app-20140909165642-0041/1 on worker-20140902113555-plogs006.sjc.domain.com-7078 (plogs006.sjc.domain.com:7078) with 24 cores 14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140909165642-0041/1 on hostPort plogs006.sjc.domain.com:7078 with 24 cores, 1024.0 MB RAM 14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor added: app-20140909165642-0041/2 on worker-20140902113556-plogs004.sjc.domain.com-7078 (plogs004.sjc.domain.com:7078) with 24 cores 14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140909165642-0041/2 on hostPort plogs004.sjc.domain.com:7078 with 24 cores, 1024.0 MB RAM 14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor updated: app-20140909165642-0041/2 is n
serialization changes -- OOM
Has anything changed in the last 30 days w.r.t serialization? I had 620MB of compressed data which used to get serialized-in-spark-memory with 4GB executor memory. Now it fails to get serialized in memory even at 10GB of executor memory. -- Bharath
Re: Cannot run SimpleApp as regular Java app
Hi Yana - I added the following to spark-class: echo RUNNER: $RUNNER echo CLASSPATH: $CLASSPATH echo JAVA_OPTS: $JAVA_OPTS echo '$@': $@ Here's the output: $ ./spark-submit --class experiments.SimpleApp --master spark://myhost.local:7077 /IdeaProjects/spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath RUNNER: /Library/Java/JavaVirtualMachines/jdk1.7.0_13.jdk/Contents/Home/bin/java CLASSPATH: ::/dev/spark-1.0.2-bin-hadoop2/conf:/dev/spark-1.0.2-bin-hadoop2/lib/spark-assembly-1.0.2-hadoop2.2.0.jar:/dev/spark-1.0.2-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/dev/spark-1.0.2-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/dev/spark-1.0.2-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar JAVA_OPTS: -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m $@: org.apache.spark.deploy.SparkSubmit --class experiments.SimpleApp --master spark://myhost.local:7077 /IdeaProjects/spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar The differences I can see in the code that runs via my standalone Java app: - Does not have -Djava.library.path= (should not make a difference) - Main class is org.apache.spark.executor.CoarseGrainedExecutorBackend instead of org.apache.spark.deploy.SparkSubmit (should not make a difference) - My jar's classes are directly available when running via spark-submit (it runs the Jar so it they will be in the main classloader) but they are only available via conf.setJars() in the standalone Java app. But they should be available indirectly in the classloader that is created in the executor: 14/09/08 10:04:06 INFO Executor: Adding file:/dev/spark-1.0.2-bin-hadoop2/work/app-20140908100358-0002/1/./spark-experiments-1.0-SNAPSHOT.jar to class loader I've been assuming that my conf.setJars() is the proper way to provide my code to Spark. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-run-SimpleApp-as-regular-Java-app-tp13695p13842.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to run python examples in spark 1.1?
I'm mostly interested in the hbase examples in the repo. I saw two examples hbase_inputformat.py and hbase_outputformat.py in the 1.1 branch. Can you show me how to run them? Compile step is done. I tried to run the examples, but failed. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-python-examples-in-spark-1-1-tp13841.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Table not found: using jdbc console to query sparksql hive thriftserver
Hi, I want to use the sparksql thrift server in my application and make sure everything is loading and working. I built Spark 1.1 SNAPSHOT and ran the thrift server using ./sbin/start-thrift-server. In my application I load tables into schemaRDDs and I expect that the thrift-server should pick them up. In the app I then perform SQL queries on a table called mutation (the same name as the table I registered from the schemaRDD). I set the driver to "org.apache.hive.jdbc.HiveDriver" and the url to "jdbc:hive2://localhost:1/mutation?zeroDateTimeBehavior=convertToNull". When I check the terminal for the thrift server output, it gets the query. However, I cannot use a jdbc console to communicate with it to show all of the databases and tables to see if mutation is loaded. I get the following errors: 14/09/09 16:51:02 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext.(SparkContext.scala:224) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:53) at com.illumina.phoenix.util.Runner.createSparkContext(Runner.java:144) at com.illumina.phoenix.etl.EtlPipelineRunner.main(EtlPipelineRunner.java:116) 1053 [main] WARN org.eclipse.jetty.util.component.AbstractLifeCycle - FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext.(SparkContext.scala:224) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:53) at com.illumina.phoenix.util.Runner.createSparkContext(Runner.java:144) at com.illumina.phoenix.etl.EtlPipelineRunner.main(EtlPipelineRunner.java:116) 14/09/09 16:51:02 WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.server.Server@35241119: java.net.BindExcep
Spark + AccumuloInputFormat
Hi, I'm trying to execute Spark SQL queries on top of the AccumuloInputFormat. Not sure if I should be asking on the Spark list or the Accumulo list, but I'll try here. The problem is that the workload to process SQL queries doesn't seem to be distributed across my cluster very well. My Spark SQL app is running in yarn-client mode. The query I'm running is "select count(*) from audit_log" (or a similarly simple query) where my audit_log table has 14.3M rows, 504M key value pairs spread fairly evenly across 8 tablet servers. Looking at the Accumulo monitor app, I only ever see a maximum of 2 tablet servers with active scans. Since the data is spread across all the tablet servers, I hoped to see 8! I realize there are a lot of moving parts here but I'd any advice about where to start looking. Using Spark 1.0.1 with Accumulo 1.6. Thanks! -Russ
Re: Crawler and Scraper with different priorities
Hi Sandeep, would you be interesting in joining my open source project? https://github.com/tribbloid/spookystuff IMHO spark is indeed not for general purpose crawling, of which distributed job is highly homogeneous. But good enough for directional scraping which involves heterogeneous input and deep graph following & extraction. Please drop me a line if you have a user case, as I'll try to integrate it as a feature. Yours Peng -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Crawler-Scraper-with-different-priorities-tp13645p13838.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Deregistered receiver for stream 0: Stopped by driver
When I stop Spark Streaming Context by calling stop(), I always get the following error: ERROR Deregistered receiver for stream 0: Stopped by driver class=org.apache.spark.streaming.scheduler.ReceiverTracker WARN Stopped executor without error class=org.apache.spark.streaming.receiver.ReceiverSupervisorImpl WARN All of the receivers have not deregistered, Map(0 -> ReceiverInfo(0,EventReceiver-0,null,false,localhost,Stopped by driver,)) class=org.apache.spark.streaming.scheduler.ReceiverTracker Is there a way to avoid these error and warning messages? Thanks, -Sing
Spark caching questions
Hi, users 1. Disk based cache eviction policy? The same LRU? 2. What is the scope of a cached RDD? Does it survive application? What happen if I run Java app next time? Will RRD be created or read from cache? If , answer is YES, then ... 3. Is there are any way to invalidate cached RDD automatically? RDD partitions? Some API kind of : RDD.isValid()? 4. HadoopRDD InputFormat - based. Some partitions (splits) may become invalid in cache. Can we reload only those partitions? Into cache? -Vladimir
Re: clarification for some spark on yarn configuration options
Hi Greg, SPARK_EXECUTOR_INSTANCES is the total number of workers in the cluster. The equivalent "spark.executor.instances" is just another way to set the same thing in your spark-defaults.conf. Maybe this should be documented. :) "spark.yarn.executor.memoryOverhead" is just an additional margin added to "spark.executor.memory" for the container. In addition to the executor's memory, the container in which the executor is launched needs some extra memory for system processes, and this is what this "overhead" (somewhat of a misnomer) is for. The same goes for the driver equivalent. "spark.driver.memory" behaves differently depending on which version of Spark you are using. If you are using Spark 1.1+ (this was released very recently), you can directly set "spark.driver.memory" and this will take effect. Otherwise, setting this doesn't actually do anything for client deploy mode, and you have two alternatives: (1) set the environment variable equivalent SPARK_DRIVER_MEMORY in spark-env.sh, and (2) if you are using Spark submit (or bin/spark-shell, or bin/pyspark, which go through bin/spark-submit), pass the "--driver-memory" command line argument. If you want your PySpark application (driver) to pick up extra class path, you can pass the "--driver-class-path" to Spark submit. If you are using Spark 1.1+, you may set "spark.driver.extraClassPath" in your spark-defaults.conf. There is also an environment variable you could set (SPARK_CLASSPATH), though this is now deprecated. Let me know if you have more questions about these options, -Andrew 2014-09-08 6:59 GMT-07:00 Greg Hill : > Is SPARK_EXECUTOR_INSTANCES the total number of workers in the cluster > or the workers per slave node? > > Is spark.executor.instances an actual config option? I found that in a > commit, but it's not in the docs. > > What is the difference between spark.yarn.executor.memoryOverhead and > spark.executor.memory > ? Same question for the 'driver' variant, but I assume it's the same > answer. > > Is there a spark.driver.memory option that's undocumented or do you have > to use the environment variable SPARK_DRIVER_MEMORY? > > What config option or environment variable do I need to set to get > pyspark interactive to pick up the yarn class path? The ones that work for > spark-shell and spark-submit don't seem to work for pyspark. > > Thanks in advance. > > Greg >
Re: spark-streaming "Could not compute split" exception
Your executor is exiting or crashing unexpectedly: On Tue, Sep 9, 2014 at 3:13 PM, Penny Espinoza wrote: > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit > code from container container_1410224367331_0006_01_03 is : 1 > 2014-09-09 21:47:26,345 WARN > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: > Exception from container-launch with container ID: > container_1410224367331_0006_01_03 and exit code: 1 You can check the app logs (yarn logs --applicationId [id]) and see why the container is exiting. There's probably an exception happening somewhere. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Yarn Driver OOME (Java heap space) when executors request map output locations
Hey, If you are interested in more details there is also a thread about this issue here: http://apache-spark-developers-list.1001551.n3.nabble.com/Eliminate-copy-while-sending-data-any-Akka-experts-here-td7127.html Kostas On Tue, Sep 9, 2014 at 3:01 PM, jbeynon wrote: > Thanks Marcelo, that looks like the same thing. I'll follow the Jira ticket > for updates. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Driver-OOME-Java-heap-space-when-executors-request-map-output-locations-tp13827p13829.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: spark-streaming "Could not compute split" exception
The node manager log looks like this - not exactly sure what this means, but the container messages seem to indicate there is still plenty of memory. 2014-09-09 21:47:00,718 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26819 for container-id container_1410224367331_0006_01_03: 319.7 MB of 1.5 GB physical memory used; 1.7 GB of 7.5 GB virtual memory used 2014-09-09 21:47:03,728 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26819 for container-id container_1410224367331_0006_01_03: 321.2 MB of 1.5 GB physical memory used; 1.7 GB of 7.5 GB virtual memory used 2014-09-09 21:47:06,736 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26819 for container-id container_1410224367331_0006_01_03: 321.5 MB of 1.5 GB physical memory used; 1.7 GB of 7.5 GB virtual memory used 2014-09-09 21:47:09,743 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26819 for container-id container_1410224367331_0006_01_03: 321.5 MB of 1.5 GB physical memory used; 1.7 GB of 7.5 GB virtual memory used 2014-09-09 21:47:12,755 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26819 for container-id container_1410224367331_0006_01_03: 321.9 MB of 1.5 GB physical memory used; 1.7 GB of 7.5 GB virtual memory used 2014-09-09 21:47:15,762 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26819 for container-id container_1410224367331_0006_01_03: 322.8 MB of 1.5 GB physical memory used; 1.7 GB of 7.5 GB virtual memory used 2014-09-09 21:47:18,430 INFO org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Sending out status for container: container_id { app_attempt_id { application_id { id: 6 cluster_timestamp: 1410224367331 } attemptId: 1 } id: 3 } state: C_RUNNING diagnostics: "" exit_status: -1000 2014-09-09 21:47:18,769 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26819 for container-id container_1410224367331_0006_01_03: 322.8 MB of 1.5 GB physical memory used; 1.7 GB of 7.5 GB virtual memory used 2014-09-09 21:47:21,777 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26819 for container-id container_1410224367331_0006_01_03: 322.8 MB of 1.5 GB physical memory used; 1.7 GB of 7.5 GB virtual memory used 2014-09-09 21:47:24,784 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26819 for container-id container_1410224367331_0006_01_03: 324.1 MB of 1.5 GB physical memory used; 1.7 GB of 7.5 GB virtual memory used 2014-09-09 21:47:26,345 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1410224367331_0006_01_03 is : 1 2014-09-09 21:47:26,345 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception from container-launch with container ID: container_1410224367331_0006_01_03 and exit code: 1 org.apache.hadoop.util.Shell$ExitCodeException: at org.apache.hadoop.util.Shell.runCommand(Shell.java:464) at org.apache.hadoop.util.Shell.run(Shell.java:379) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 2014-09-09 21:47:26,345 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 2014-09-09 21:47:26,345 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Container exited with a non-zero exit code 1 2014-09-09 21:47:26,346 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1410224367331_0006_01_03 transitioned from RUNNING to EXITED_WITH_FAILURE 2014-09-09 21:47:26,346 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Cleaning up container
Re: Yarn Driver OOME (Java heap space) when executors request map output locations
Thanks Marcelo, that looks like the same thing. I'll follow the Jira ticket for updates. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Driver-OOME-Java-heap-space-when-executors-request-map-output-locations-tp13827p13829.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Yarn Driver OOME (Java heap space) when executors request map output locations
Hi, Yes, this is a problem, and I'm not aware of any simple workarounds (or complex one for that matter). There are people working to fix this, you can follow progress here: https://issues.apache.org/jira/browse/SPARK-1239 On Tue, Sep 9, 2014 at 2:54 PM, jbeynon wrote: > I'm running on Yarn with relatively small instances with 4gb memory. I'm not > caching any data but when the map stage ends and shuffling begins all of the > executors request the map output locations at the same time which seems to > kill the driver when the number of executors is turned up. > > For example, the "size of output statuses" is about 10mb and with 500 > executors the driver appears to be making 500 (5gb of data) copies of this > data to send out and running out of memory. When I turn down the number of > executors everything runs fine. > > Has anyone else run into this? Maybe I'm misunderstanding the underlying > cause. I don't have a copy of the stack trace handy but can recreate it if > necessary. It was somewhere in the for HeapByteBuffer. Any advice > would be helpful. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Driver-OOME-Java-heap-space-when-executors-request-map-output-locations-tp13827.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Yarn Driver OOME (Java heap space) when executors request map output locations
I'm running on Yarn with relatively small instances with 4gb memory. I'm not caching any data but when the map stage ends and shuffling begins all of the executors request the map output locations at the same time which seems to kill the driver when the number of executors is turned up. For example, the "size of output statuses" is about 10mb and with 500 executors the driver appears to be making 500 (5gb of data) copies of this data to send out and running out of memory. When I turn down the number of executors everything runs fine. Has anyone else run into this? Maybe I'm misunderstanding the underlying cause. I don't have a copy of the stack trace handy but can recreate it if necessary. It was somewhere in the for HeapByteBuffer. Any advice would be helpful. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Driver-OOME-Java-heap-space-when-executors-request-map-output-locations-tp13827.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark.cleaner.ttl and spark.streaming.unpersist
The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Re: spark-streaming "Could not compute split" exception
This has all the symptoms of Yarn killing your executors due to them exceeding their memory limits. Could you check your RM/NM logs to see if that's the case? (The error was because of an executor at domU-12-31-39-0B-F1-D1.compute-1.internal, so you can check that NM's log file.) If that's the case, you'll need to play with the "spark.yarn.executor.memoryOverhead" config (see http://spark.apache.org/docs/latest/running-on-yarn.html). On Tue, Sep 9, 2014 at 1:13 PM, Penny Espinoza wrote: > Hey - I have a Spark 1.0.2 job (using spark-streaming-kafka) that runs > successfully using master = local[4]. However, when I run it on a Hadoop > 2.2 EMR cluster using master yarn-client, it fails after running for about 5 > minutes. My main method does something like this: > > gets streaming context > creates a DStream from KafkaUtils.createStream (let’s call this a) > creates another DStream from a.flatMap (let’s call this b) > creates another DStream from b.updateStateByKey (c) > creates another DStream from c.flatMap (d) > runs d.foreachRDD, and uses Tuplejump’s Calliope cql3SaveToCassandra to save > data to Cassandra > > > Here’s the error message: > > 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to > java.lang.Exception > java.lang.Exception: Could not compute split, block input-0-1410293154000 > not found > at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.Task.run(Task.scala:51) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > > > According to logs, the block referenced in the error message was added to > memory only about 10 seconds before this error appears, and there is no > evidence in the log of any blocks being cleared from memory, or of it > running out of memory. Below is a snippet of the log: > > 14/09/09 20:05:54 INFO storage.BlockManagerInfo: Added input-0-1410293154000 > in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 214.4 > KB, free: 490.8 MB) > … ( several other messages like the one above, but for different blocks) > 14/09/09 20:05:58 INFO storage.BlockManagerInfo: Added input-0-1410293158000 > in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 3.1 MB, > free: 457.3 MB) > 14/09/09 20:06:00 INFO scheduler.ReceiverTracker: Stream 0 received 21 > blocks > 14/09/09 20:06:00 INFO dstream.StateDStream: Persisting RDD 57 for time > 141029316 ms to StorageLevel(false, true, false, false, 1) at time > 141029316 ms > 14/09/09 20:06:00 INFO dstream.StateDStream: Marking RDD 57 for time > 141029316 ms for checkpointing at time 141029316 ms > 14/09/09 20:06:00 INFO scheduler.JobScheduler: Added jobs for time > 141029316 ms > 14/09/09 20:06:00 INFO scheduler.JobGenerator: Checkpointing graph for time > 141029316 ms > 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updating checkpoint data for > time 141029316 ms > 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updated checkpoint data for > time 141029316 ms > 14/09/09 20:06:00 INFO scheduler.JobScheduler: Starting job streaming job > 141029316 ms.0 from job set of time 141029316 ms > 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Saving checkpoint for > time 141029316 ms to file > 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-141029316' > 14/09/09 20:06:00 INFO spark.SparkContext: Starting job: > saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203 > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Registering RDD 54 (flatMap > at FlatMappedDStream.scala:35) > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Got job 12 > (saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) with 2 output > partitions (allowLocal=false) > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Final stage: Stage > 23(saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Parents of final stage: > List(Stage 24) > 14/09/09 20:06:00 INFO
Spark HiveQL support plan
Hi, In Spark website, there’s a plan to support HiveQL on top of Spark SQL and also to support JDBC/ODBC. I would like to know if in this “HiveQL” supported by Spark (or Spark SQL accessible through JDBC/ODBC), is there a plan to add extensions to leverage different Spark features like machine learning and stream? I’m asking this as we’re considering using Spark from remote machine, so this would be easier through JDBC/ODBC. But it’ll be good to also take benefice of other Spark features than the ones exist in HiveQL. Thanks, Xiaohui Xue
Distributed Deep Learning Workshop with Scala, Akka, and Spark
On September 25-26, SF Scala teams up with Adam Gibson, the creator of deeplearning4j.org, to teach the first ever Distributed Deep Learning with Scala Akka, and Spark workshop. Deep Learning is enabling break-through advances in the areas such as image recognition and natural language processing. It is a renaissance of neural networks, made possible by the vast quantities of data to train them, so that they match human performance on a variety of tasks. Deep Learning is causing a lot of interest across a variety startups understanding domains backed by “big" data, as it’s one of the “easiest” ways to make sense out of data — once you put the algorithms and systems in place making it possible. It turns out that in order to take advantage of the “big” data available, you need to scale out properly — and this is where Scala, Akka, and Spark shine. We’ll present a distributed system approach which makes you, armed with Scala stack, a rival of Google. As we’re offering this course for the first time, we’re keeping the early bird pricing longer for this workshop. Seating is very limited as we’ll work collaboratively. Please sign up at http://bythebay.ticketleap.com/deep-learning-september-2014/ SF Scala is also offering the Typesafe-certified course, Fast Track to Scala. http://bythebay.ticketleap.com/fast-track-to-scala/ Scala is underpinning multiple Machine Learning stacks such as FACTORIE, ScalaNLP, and has fantastic numerical libraries such as Breeze, Spire, and Saddle. Scala is at the heart of Spark and BDAS. It is the choice of data scientists who want their prototypes work at web-scale and be ready for production. This hands-on workshop will get you from zero to Scala hero in just two days. If you have any questions about these courses, please email train...@bythebay.io. A+ (Alexy) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: prepending jars to the driver class path for spark-submit on YARN
I finally seem to have gotten past this issue. Here’s what I did: * rather than using the binary distribution, I built Spark from scratch to eliminate the 4.1 version of org.apache.httpcomponents from the assembly * git clone https://github.com/apache/spark.git * cd spark * git checkout v1.0.2 * edited pom.xml to remove the modules sql/hive and examples * export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m” * mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package * rebuilt my own assembly, eliminating all exclusions I had previously included to force use of org.apache.httpcomponents 4.1 On Sep 8, 2014, at 12:03 PM, Penny Espinoza mailto:pesp...@societyconsulting.com>> wrote: I have tried using the spark.files.userClassPathFirst option (which, incidentally, is documented now, but marked as experimental), but it just causes different errors. I am using spark-streaming-kafka. If I mark spark-core and spark-streaming as provided and also exclude them from the spark-streaming-kafka dependency, I get this error: 14/09/08 18:34:23 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassCastException java.lang.ClassCastException: cannot assign instance of com.oncue.rna.realtime.streaming.spark.BaseKafkaExtractorJob$$anonfun$getEventsStream$1 to fie ld org.apache.spark.rdd.MappedRDD.f of type scala.Function1 in instance of org.apache.spark.rdd.MappedRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObje
spark-streaming "Could not compute split" exception
Hey - I have a Spark 1.0.2 job (using spark-streaming-kafka) that runs successfully using master = local[4]. However, when I run it on a Hadoop 2.2 EMR cluster using master yarn-client, it fails after running for about 5 minutes. My main method does something like this: 1. gets streaming context 2. creates a DStream from KafkaUtils.createStream (let’s call this a) 3. creates another DStream from a.flatMap (let’s call this b) 4. creates another DStream from b.updateStateByKey (c) 5. creates another DStream from c.flatMap (d) 6. runs d.foreachRDD, and uses Tuplejump’s Calliope cql3SaveToCassandra to save data to Cassandra Here’s the error message: 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-0-1410293154000 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) According to logs, the block referenced in the error message was added to memory only about 10 seconds before this error appears, and there is no evidence in the log of any blocks being cleared from memory, or of it running out of memory. Below is a snippet of the log: 14/09/09 20:05:54 INFO storage.BlockManagerInfo: Added input-0-1410293154000 in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 214.4 KB, free: 490.8 MB) … ( several other messages like the one above, but for different blocks) 14/09/09 20:05:58 INFO storage.BlockManagerInfo: Added input-0-1410293158000 in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 3.1 MB, free: 457.3 MB) 14/09/09 20:06:00 INFO scheduler.ReceiverTracker: Stream 0 received 21 blocks 14/09/09 20:06:00 INFO dstream.StateDStream: Persisting RDD 57 for time 141029316 ms to StorageLevel(false, true, false, false, 1) at time 141029316 ms 14/09/09 20:06:00 INFO dstream.StateDStream: Marking RDD 57 for time 141029316 ms for checkpointing at time 141029316 ms 14/09/09 20:06:00 INFO scheduler.JobScheduler: Added jobs for time 141029316 ms 14/09/09 20:06:00 INFO scheduler.JobGenerator: Checkpointing graph for time 141029316 ms 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updating checkpoint data for time 141029316 ms 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updated checkpoint data for time 141029316 ms 14/09/09 20:06:00 INFO scheduler.JobScheduler: Starting job streaming job 141029316 ms.0 from job set of time 141029316 ms 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Saving checkpoint for time 141029316 ms to file 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-141029316' 14/09/09 20:06:00 INFO spark.SparkContext: Starting job: saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Registering RDD 54 (flatMap at FlatMappedDStream.scala:35) 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Got job 12 (saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) with 2 output partitions (allowLocal=false) 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Final stage: Stage 23(saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 24) 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Missing parents: List(Stage 24) 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting Stage 24 (FlatMappedRDD[54] at flatMap at FlatMappedDStream.scala:35), which has no missing parents 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Deleting hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-141029311.bk 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Checkpoint for time 141029316 ms saved to file 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-141029316', took 4351 bytes and 55 ms 14/09/09 20:06:00 INFO streaming.D
Re: Spark processes not doing on killing corresponding YARN application
I figured out this issue (in our case) ...And I'll vent a little in my reply here... =:)Fedora's well-intentioned firewall (firewall-cmd) requires you to open (enable) any port/services on a host that you need to connect to (including SSH/22 - which is enabled by default, of course). So when launching client applications that use ephemeral ports to connect back to (as a Spark App does for remote YARN ResourceManager/NodeManagers to connect back to), you can't know what that port will be to enable it, unless the application allows you to specify that as a launch property (which you can for Spark Apps via -- -Dspark.driver.port="N").Again, well intentioned, but always a pain.So... you have to either disable the firewall capability in Fedora; or you open/enable a range of ports and tell your applications to use one of those.Also note that as of this writing, firewall-cmd's ability to port-forwarding from the HOST to GUESTS in Libvirt/KVM-based Hadoop/YARN/HDFS test/dev clusters, doesn't work (it never has -- it's on the TODO list). It's another capability that you'll need in order to reach daemon ports running *inside* the KVM cluster (for example, UI ports). The work-around here (besides, again, disabling the Fedora Firewall altogether) is to use same-subnet BRIDGING (not NAT-ting). Doing that will eliminate the need for port-forawrding (which again doesn't work). I've filed bugs in the past for this.So that is why YARN applications weren't terminating correctly for Spark Aps, or for that matter working at all since it uses ephemeral ports (by necessity).So whatever the port your Spark application uses, remember to issue the command:use@driverHost$ sudo firewall-cmd --zone=public --add-port=/SparkAppPort//tcpor, better yet, use a port-deterministic strategy mentioned earlier.(Hopefully the verbosity here will help someone in their furute search. Fedora aside, the original problem here can be network related, as I discovered).sincerely,didata -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-processes-not-doing-on-killing-corresponding-YARN-application-tp13443p13819.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark on yarn history server + hdfs permissions issue
I am running Spark on Yarn with the HDP 2.1 technical preview. I'm having issues getting the spark history server permissions to read the spark event logs from hdfs. Both sides are configured to write/read logs from: hdfs:///apps/spark/events The history server is running as user spark, the jobs are running as user lavaqe. Both users are in the hdfs group on all the nodes in the cluster. That root logs folder is globally writeable, but owned by the spark user: drwxrwxrwx - spark hdfs 0 2014-09-09 18:19 /apps/spark/events All good so far. Spark jobs create subfolders and put their event logs in there just fine. The problem is that the history server, running as the spark user, cannot read those logs. They're written as the user that initiates the job, but still in the same hdfs group: drwxrwx--- - lavaqe hdfs 0 2014-09-09 19:24 /apps/spark/events/spark-pi-1410290714996 The files are group readable/writable, but this is the error I get: Permission denied: user=spark, access=READ_EXECUTE, inode="/apps/spark/events/spark-pi-1410290714996":lavaqe:hdfs:drwxrwx--- So, two questions, I guess: 1. Do group permissions just plain not work in hdfs or am I missing something? 2. Is there a way to tell Spark to log with more permissive permissions so the history server can read the generated logs? Greg
Re: streaming: code to simulate a network socket data source
I utilize this code in separated but the program block in this character: val socket = listener.accept() Do you have any suggestion? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-code-to-simulate-a-network-socket-data-source-tp3431p13817.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem in running mosek in spark cluster - java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738)
If that library has native dependencies you'd need to make sure that the native code is on all boxes and in the path with export SPARK_LIBRARY_PATH=... On Tue, Sep 9, 2014 at 10:17 AM, ayandas84 wrote: > We have a small apache spark cluster of 6 computers. We are trying to > solve > a distributed problem which requires solving a optimization problem at each > machine during a spark map operation. > > We decided to use mosek as the solver and I collected an academic license > to > this end. We observed that mosek works fine in a single system. However, > when we prepare a jar file, include the mosek.jar into the library and try > to run the jar in the cluster as a spark job it gives errors. > > java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path > > Does this problem has any thing to do with the license? We have set the > necessary path variables i n the profile of the user in the master machine > but we are not sure about what changes needs to be made to the other > machines in the cluster. > > We shall be greatly obliged if you please suggest the necessary solution > and > help us out. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-running-mosek-in-spark-cluster-java-lang-UnsatisfiedLinkError-no-mosekjava7-0-in-java-lib-tp13799.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Cannot run SimpleApp as regular Java app
spark-submit is a script which calls spark-class script. Can you output the command that spark-class runs (say, by putting set -x before the very last line?). You should see the java command that is being run. The scripts do some parameter setting so it's possible you're missing something. It seems to me you think your worker memory is 2G but the executor is clearly launched with -Xms512M" "-Xmx512M"...so that's all you'd get. On Mon, Sep 8, 2014 at 10:16 AM, ericacm wrote: > Dear all: > > I am a brand new Spark user trying out the SimpleApp from the Quick Start > page. > > Here is the code: > > object SimpleApp { > def main(args: Array[String]) { > val logFile = "/dev/spark-1.0.2-bin-hadoop2/README.md" // Should be > some > file on your system > val conf = new SparkConf() > .setAppName("Simple Application") > .set("spark.executor.memory", "512m") > .setMaster("spark://myhost.local:7077") > > > .setJars(Seq("/spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar")) > val sc = new SparkContext(conf) > try { > val logData = sc.textFile(logFile, 2).cache() > val numAs = logData.filter(line => line.contains("a")).count() > val numBs = logData.filter(line => line.contains("b")).count() > println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) > } finally { > sc.stop() > } > } > } > > I am using Spark 1.0.2 and Scala 2.10.4. In spark-env.sh I have > SPARK_WORKER_MEMORY=2g. > > I am trying to run this as a standalone Java app in my IDE. > > Note that this code *does* work when I either > - Change the master to "local" (works running from IDE) > - Run it using spark-submit > > The application/driver log is: > > 14/09/08 10:03:55 INFO spark.SecurityManager: Changing view acls to: eric > 14/09/08 10:03:55 INFO spark.SecurityManager: SecurityManager: > authentication disabled; ui acls disabled; users with view permissions: > Set(eric) > 14/09/08 10:03:56 INFO slf4j.Slf4jLogger: Slf4jLogger started > 14/09/08 10:03:56 INFO Remoting: Starting remoting > 14/09/08 10:03:56 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://spark@10.0.1.5:61645] > 14/09/08 10:03:56 INFO Remoting: Remoting now listens on addresses: > [akka.tcp://spark@10.0.1.5:61645] > 14/09/08 10:03:56 INFO spark.SparkEnv: Registering MapOutputTracker > 14/09/08 10:03:56 INFO spark.SparkEnv: Registering BlockManagerMaster > 14/09/08 10:03:56 INFO storage.DiskBlockManager: Created local directory at > > /var/folders/j1/5rzyf1x97q9_7gj3mdc79t3cgn/T/spark-local-20140908100356-2496 > 14/09/08 10:03:56 INFO storage.MemoryStore: MemoryStore started with > capacity 279.5 MB. > 14/09/08 10:03:56 INFO network.ConnectionManager: Bound socket to port > 61646 > with id = ConnectionManagerId(10.0.1.5,61646) > 14/09/08 10:03:56 INFO storage.BlockManagerMaster: Trying to register > BlockManager > 14/09/08 10:03:56 INFO storage.BlockManagerInfo: Registering block manager > 10.0.1.5:61646 with 279.5 MB RAM > 14/09/08 10:03:56 INFO storage.BlockManagerMaster: Registered BlockManager > 14/09/08 10:03:56 INFO spark.HttpServer: Starting HTTP Server > 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031 > 14/09/08 10:03:57 INFO server.AbstractConnector: Started > SocketConnector@0.0.0.0:61647 > 14/09/08 10:03:57 INFO broadcast.HttpBroadcast: Broadcast server started at > http://10.0.1.5:61647 > 14/09/08 10:03:57 INFO spark.HttpFileServer: HTTP File server directory is > > /var/folders/j1/5rzyf1x97q9_7gj3mdc79t3cgn/T/spark-d5637279-5caa-4c14-a00f-650f1dd915bc > 14/09/08 10:03:57 INFO spark.HttpServer: Starting HTTP Server > 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031 > 14/09/08 10:03:57 INFO server.AbstractConnector: Started > SocketConnector@0.0.0.0:61648 > 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031 > 14/09/08 10:03:57 INFO server.AbstractConnector: Started > SelectChannelConnector@0.0.0.0:4040 > 14/09/08 10:03:57 INFO ui.SparkUI: Started SparkUI at http://10.0.1.5:4040 > 2014-09-08 10:03:57.567 java[58736:1703] Unable to load realm info from > SCDynamicStore > 14/09/08 10:03:57 INFO spark.SparkContext: Added JAR > /spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar at > http://10.0.1.5:61648/jars/spark-experiments-1.0-SNAPSHOT.jar with > timestamp > 1410185037723 > 14/09/08 10:03:57 INFO client.AppClient$ClientActor: Connecting to master > spark://myhost.local:7077... > 14/09/08 10:03:57 INFO storage.MemoryStore: ensureFreeSpace(32960) called > with curMem=0, maxMem=293063884 > 14/09/08 10:03:57 INFO storage.MemoryStore: Block broadcast_0 stored as > values to memory (estimated size 32.2 KB, free 279.5 MB) > 14/09/08 10:03:58 INFO cluster.SparkDeploySchedulerBackend: Connected to > Spark cluster with app ID app-20140908100358-0002 > 14/09/08 10:03:58 INFO client.AppClient$ClientActor: Executor added: > app-20140908100358-0002/0 on worker-20140908100129-10.0.1.5-61526 > (10.0.1.5:61526) with 8 cor
Re: streaming: code to simulate a network socket data source
Hello Diana, How can I include this implementation in my code, in terms of start this task together the NetworkWordCount. In my case, I have a directory with several files. Then, I include this line: StreamingDataGenerator.streamingGenerator(NetPort, BytesSecond, DirFiles) But the program stays in my loop of files. And after returning to NetworkWordCount. Can you suggest me something to start these tasks in parallel? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-code-to-simulate-a-network-socket-data-source-tp3431p13814.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark on Yarn - how group by data properly
On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets wrote: > Hi , > >I came from map/reduce background and try to do quite trivial thing: > > I have a lot of files ( on hdfs ) - format is : > >1 , 2 , 3 >2 , 3 , 5 >1 , 3, 5 > 2, 3 , 4 > 2 , 5, 1 > > I am actually need to group by key (first column) : > key values > 1 --> (2,3),(3,5) > 2 --> (3,5),(3,4),(5,1) > > and I need to process (pass) values to the function f ( my custom > function) > outcome of function f() should be to hdfs with corresponding key: > 1 --> f() outcome > 2 --> f() outcome. > > My code is : > > def doSplit(x): > y = x.split(',') > if(len(y)==3): >return y[0],(y[1],y[2]) > > > lines = sc.textFile(filename,1) > counts = lines.map(doSplit).groupByKey() > output = counts.collect() > > for (key, value) in output: > print 'build model for key ->' , key > print value > f(str(key) , value)) > > > Questions: >1) lines.map(doSplit).groupByKey() - I didn't find the option to use > groupByKey( f() ) to process grouped values? how can I process grouped keys > by custom function? function f has some not trivial logic. The result of groupByKey() is still RDD with (key, ResultIterable(values)), so you can continue to call map() or mapValues() on it: lines.map(doSplit).groupByKey().map(f) But your `f` need two parameters, the map() will assume that `f` take one parameter, so you need to build a wrapper for `f`: lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs)) If the `f` only accept values as list, then you need to convert `vs` into list: result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, list(vs))) finally, you could save the `result` into HDFS: result.saveAsPickleFile(path, batch=1024) > 2) Using output ( I really don't like this approach ) to pass to > function looks like not scalable and executed only on one machine? What is > the way using PySpark process grouped keys in distributed fashion. > Multiprocessing and on different machine of the cluster. > > 3)In case of processing output how data can be stored on hdfs? Currently, it's not easy to access files in HDFS, you could do it by sc.parallelize(local_data).map(str).saveAsTextFile() > Thanks > Oleg. > > > > > > > > > > > > > > > > > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accuracy hit in classification with Spark
I have also ran some tests on the other algorithms available with MLlib but got dismal accuracy. Is the method of creating LabeledPoint RDD different for other algorithms such as, LinearRegressionWithSGD? Any help is appreciated. - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p13812.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accuracy hit in classification with Spark
Thanks for the information Xiangrui. I am using the following example to classify documents. http://chimpler.wordpress.com/2014/06/11/classifiying-documents-using-naive-bayes-on-apache-spark-mllib/ I am not sure if this is the best way to convert textual data into vectors. Can you please confirm if this is the ideal solution as I could not identify any shortcomings. Also, I am splitting the data into 70/30 sets, which is same for Mahout so it should not have an impact on accuracy. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p13811.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD memory questions
On Tue, Sep 9, 2014 at 10:07 AM, Boxian Dong wrote: > I currently working on a machine learning project, which require the RDDs' > content to be (mostly partially) updated during each iteration. Because the > program will be converted directly from "traditional" python object-oriented > code, the content of the RDD will be modified in the mapping function. To > test the functionality and memory , I writed a testing program: > >class TestClass(object): > def __init__(self): > self.data = [] > > def setup(self): > self.data = range(2) > return self > > def addNumber(self, number): > length = len(self.data) > for i in range(length): > self.data[i] += number > return self > > def sumUp(self): > totoal = 0 > for n in self.data: > totoal += n > return totoal > > and Spark main: > > origData = [] > for i in range(50): > origData.append((i, TestClass())) > # create the RDD and cache it > rddData = sc.parallelize(origData).mapValues(lambda v : v.setup()) > rddData.cache() > > # modifying the content of RDD in map function > scD = rddData > for i in range(100): > scD = scD.mapValues(lambda v : v.addNumber(10)) > > scDSum = scD.map(lambda (k, v) : v.sumUp()) > v = scDSum.reduce(lambda a, b: a + b) > > print " -- after the transfermation, the value is ", v > > scDSum = rddData .map(lambda (k, v) : v.sumUp()) > v = scDSum.reduce(lambda a, b: a + b) > > print " -- after the transformation, the cached value is ", v > > - By judging the results, it seems to me that when the RDDs is cached, the > directed modification doesn't affect the content > - By the monitoring of the memory usage, it seems to me that the memory > are not duplicated during each RDD (narrow dependence) transformation (or I > am wrong) > > therefore, my question is: > - how the cache works, does it make a copy of the data separately ? > - How the memory is managed in the MAP function? (in narrow dependence) > Are the entire RDDs first duplicated, modified and then assigned to the new > RDDs, and afterward the old RDDs will be deleted from the memory. Or the new > RDDs will reuse the same memory of the old RDDs, without the > duplication/copy of the memory? I'm trying to answer some of your questions: The RDD is cached in JVM (after serialized by pickle). In Python, it reads the serialized data from socket then deserialized it into Python objects, call mapper or reducer on them, finally sending them back to JVM via socket. The Python process only hold a batch of objects, so the memory usage will be smaller than the whole partition. The cache in JVM is created after first iteration of them. So when you process them the second time (or even more), they will be read from cache in JVM. RDDs are read only, you can not modify them, each transformation will create new RDDs. During MAP function, the objects in RDDs are throwed away after accessing, any modification to them will be lost. > - If the new generated RDDs directly use the memory of the old RDDs (in > narrow dependence) , why the cached RDDs still reserve old content. Are the > cached RDDs treated differently from uncached RDDs in memory management. There is no two RDDs sharing any memory, they are totally separated. > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupBy gives non deterministic results
Which version of Spark are you using? This bug had been fixed in 0.9.2, 1.0.2 and 1.1, could you upgrade to one of these versions to verify it? Davies On Tue, Sep 9, 2014 at 7:03 AM, redocpot wrote: > Thank you for your replies. > > More details here: > > The prog is executed on local mode (single node). Default env params are > used. > > The test code and the result are in this gist: > https://gist.github.com/coderh/0147467f0b185462048c > > Here is 10 first lines of the data: 3 fields each row, the delimiter is ";" > > 3801959;11775022;118 > 3801960;14543202;118 > 3801984;11781380;20 > 3801984;13255417;20 > 3802003;11777557;91 > 3802055;11781159;26 > 3802076;11782793;102 > 3802086;17881551;102 > 3802087;19064728;99 > 3802105;12760994;99 > ... > > There are 27 partitions(small files). Total size is about 100 Mb. > > We find that this problem is highly probably caused by the bug SPARK-2043: > https://issues.apache.org/jira/browse/SPARK-2043 > > Could someone give more details on this bug ? > > The pull request say: > > The current implementation reads one key with the next hash code as it > finishes reading the keys with the current hash code, which may cause it to > miss some matches of the next key. This can cause operations like join to > give the wrong result when reduce tasks spill to disk and there are hash > collisions, as values won't be matched together. This PR fixes it by not > reading in that next key, using a peeking iterator instead. > > I don't understand why reading a key with the next hash code will cause it > to miss some matches of the next key. If someone could show me some code to > dig in, it's highly appreciated. =) > > Thank you. > > Hao. > > > > > > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13797.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter function problem
Hi, val test = persons.value .map{tuple => (tuple._1, tuple._2 .filter{event => *inactiveIDs.filter(event2 => event2._1 == tuple._1).count() != 0})} Your problem is right between the asterisk. You can't make an RDD operation inside an RDD operation, because RDD's can't be serialized. Therefore you are receiving the NullPointerException. Try joining the RDDs based on `event` and then filter based on that. Best, Burak - Original Message - From: "Blackeye" To: u...@spark.incubator.apache.org Sent: Tuesday, September 9, 2014 3:34:58 AM Subject: Re: Filter function problem In order to help anyone to answer i could say that i checked the inactiveIDs.filter operation seperated, and I found that it doesn't return null in any case. In addition i don't how to handle (or check) whether a RDD is null. I find the debugging to complicated to point the error. Any ideas how to find the null pointer? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Filter-function-problem-tp13787p13789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter function problem
You should not be broadcasting an RDD. You also should not be passing an RDD in a lambda to another RDD. If you want, can call RDD.collect and then broadcast those values (of course you must be able to fit all those values in memory). On Tue, Sep 9, 2014 at 6:34 AM, Blackeye wrote: > In order to help anyone to answer i could say that i checked the > inactiveIDs.filter operation seperated, and I found that it doesn't return > null in any case. In addition i don't how to handle (or check) whether a > RDD > is null. I find the debugging to complicated to point the error. Any ideas > how to find the null pointer? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Filter-function-problem-tp13787p13789.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: If for YARN you use 'spark.yarn.jar', what is the LOCAL equivalent to that property ...
Yes, that's how file: URLs are interpreted everywhere in Spark. (It's also explained in the link to the docs I posted earlier.) The second interpretation below is "local:" URLs in Spark, but that doesn't work with Yarn on Spark 1.0 (so it won't work with CDH 5.1 and older either). On Mon, Sep 8, 2014 at 6:00 PM, Dimension Data, LLC. < subscripti...@didata.us> wrote: > Even when using 'file:///...' nomenclature in SPARK_JAR (instead of > through the yet-to-be implemented 'spark.yarn.jar'\ > property), it's interpretation still seems to be: > >'Tell me where the local spark jar is located so that I can upload it > (i.e. hdfs dfs -put) to a HDFS staging area for you'. > -as oppoese to- >'Tell me where the local spark jar is located on the NMs, and I will > look for it at that UNIX path'. > -- Marcelo
RDD memory questions
I currently working on a machine learning project, which require the RDDs' content to be (mostly partially) updated during each iteration. Because the program will be converted directly from "traditional" python object-oriented code, the content of the RDD will be modified in the mapping function. To test the functionality and memory , I writed a testing program: class TestClass(object): def __init__(self): self.data = [] def setup(self): self.data = range(2) return self def addNumber(self, number): length = len(self.data) for i in range(length): self.data[i] += number return self def sumUp(self): totoal = 0 for n in self.data: totoal += n return totoal and Spark main: origData = [] for i in range(50): origData.append((i, TestClass())) # create the RDD and cache it rddData = sc.parallelize(origData).mapValues(lambda v : v.setup()) rddData.cache() # modifying the content of RDD in map function scD = rddData for i in range(100): scD = scD.mapValues(lambda v : v.addNumber(10)) scDSum = scD.map(lambda (k, v) : v.sumUp()) v = scDSum.reduce(lambda a, b: a + b) print " -- after the transfermation, the value is ", v scDSum = rddData .map(lambda (k, v) : v.sumUp()) v = scDSum.reduce(lambda a, b: a + b) print " -- after the transformation, the cached value is ", v - By judging the results, it seems to me that when the RDDs is cached, the directed modification doesn't affect the content - By the monitoring of the memory usage, it seems to me that the memory are not duplicated during each RDD (narrow dependence) transformation (or I am wrong) therefore, my question is: - how the cache works, does it make a copy of the data separately ? - How the memory is managed in the MAP function? (in narrow dependence) Are the entire RDDs first duplicated, modified and then assigned to the new RDDs, and afterward the old RDDs will be deleted from the memory. Or the new RDDs will reuse the same memory of the old RDDs, without the duplication/copy of the memory? - If the new generated RDDs directly use the memory of the old RDDs (in narrow dependence) , why the cached RDDs still reserve old content. Are the cached RDDs treated differently from uncached RDDs in memory management. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PySpark on Yarn - how group by data properly
Hi , I came from map/reduce background and try to do quite trivial thing: I have a lot of files ( on hdfs ) - format is : 1 , 2 , 3 2 , 3 , 5 1 , 3, 5 2, 3 , 4 2 , 5, 1 I am actually need to group by key (first column) : key values 1 --> (2,3),(3,5) 2 --> (3,5),(3,4),(5,1) and I need to process (pass) values to the function f ( my custom function) outcome of function f() should be to hdfs with corresponding key: 1 --> f() outcome 2 --> f() outcome. My code is : def doSplit(x): y = x.split(',') if(len(y)==3): return y[0],(y[1],y[2]) lines = sc.textFile(filename,1) counts = lines.map(doSplit).groupByKey() output = counts.collect() for (key, value) in output: print 'build model for key ->' , key print value f(str(key) , value)) Questions: 1) lines.map(doSplit).groupByKey() - I didn't find the option to use groupByKey( f() ) to process grouped values? how can I process grouped keys by custom function? function f has some not trivial logic. 2) Using output ( I really don't like this approach ) to pass to function looks like not scalable and executed only on one machine? What is the way using PySpark process grouped keys in distributed fashion. Multiprocessing and on different machine of the cluster. 3)In case of processing output how data can be stored on hdfs? Thanks Oleg.
Re: Querying a parquet file in s3 with an ec2 install
Okay, This seems to be either a code version issue or a communication issue. It works if I execute the spark shell from the master node. It doesn't work if I run it from my laptop and connect to the master node. I had opened the ports for the WebUI (8080) and the cluster manager (7077) for the master node or it fails much sooner. Do I need to open up the ports for the workers as well? I used the spark-ec2 install script with --spark-version using both 1.0.2 and then again with the git hash tag that corresponds to 1.1.0rc4 (2f9b2bd7844ee8393dc9c319f4fefedf95f5e460). In both cases I rebuilt from source using the same codebase on my machine and moved the entire project into /root/spark (since to run the spark-shell it needs to match the same path as the install on ec2). Could I have missed something here? Thanks. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13802.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Huge matrix
Better to do it in a PR of your own, it's not sufficiently related to dimsum On Tue, Sep 9, 2014 at 7:03 AM, Debasish Das wrote: > Cool...can I add loadRowMatrix in your PR ? > > Thanks. > Deb > > On Tue, Sep 9, 2014 at 1:14 AM, Reza Zadeh wrote: > >> Hi Deb, >> >> Did you mean to message me instead of Xiangrui? >> >> For TS matrices, dimsum with positiveinfinity and computeGramian have the >> same cost, so you can do either one. For dense matrices with say, 1m >> columns this won't be computationally feasible and you'll want to start >> sampling with dimsum. >> >> It would be helpful to have a loadRowMatrix function, I would use it. >> >> Best, >> Reza >> >> On Tue, Sep 9, 2014 at 12:05 AM, Debasish Das >> wrote: >> >>> Hi Xiangrui, >>> >>> For tall skinny matrices, if I can pass a similarityMeasure to >>> computeGrammian, I could re-use the SVD's computeGrammian for similarity >>> computation as well... >>> >>> Do you recommend using this approach for tall skinny matrices or just >>> use the dimsum's routines ? >>> >>> Right now RowMatrix does not have a loadRowMatrix function like the one >>> available in LabeledPoint...should I add one ? I want to export the matrix >>> out from my stable code and then test dimsum... >>> >>> Thanks. >>> Deb >>> >>> >>> >>> On Fri, Sep 5, 2014 at 9:43 PM, Reza Zadeh wrote: >>> I will add dice, overlap, and jaccard similarity in a future PR, probably still for 1.2 On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das wrote: > Awesome...Let me try it out... > > Any plans of putting other similarity measures in future (jaccard is > something that will be useful) ? I guess it makes sense to add some > similarity measures in mllib... > > > On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh > wrote: > >> Yes you're right, calling dimsum with gamma as PositiveInfinity turns >> it into the usual brute force algorithm for cosine similarity, there is >> no >> sampling. This is by design. >> >> >> On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das < >> debasish.da...@gmail.com> wrote: >> >>> I looked at the code: similarColumns(Double.posInf) is generating >>> the brute force... >>> >>> Basically dimsum with gamma as PositiveInfinity will produce the >>> exact same result as doing catesian products of RDD[(product, vector)] >>> and >>> computing similarities or there will be some approximation ? >>> >>> Sorry I have not read your paper yet. Will read it over the weekend. >>> >>> >>> >>> On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh >>> wrote: >>> For 60M x 10K brute force and dimsum thresholding should be fine. For 60M x 10M probably brute force won't work depending on the cluster's power, and dimsum thresholding should work with appropriate threshold. Dimensionality reduction should help, and how effective it is will depend on your application and domain, it's worth trying if the direct computation doesn't work. You can also try running KMeans clustering (perhaps after dimensionality reduction) if your goal is to find batches of similar points instead of all pairs above a threshold. On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das < debasish.da...@gmail.com> wrote: > Also for tall and wide (rows ~60M, columns 10M), I am considering > running a matrix factorization to reduce the dimension to say ~60M x > 50 and > then run all pair similarity... > > Did you also try similar ideas and saw positive results ? > > > > On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das < > debasish.da...@gmail.com> wrote: > >> Ok...just to make sure I have RowMatrix[SparseVector] where rows >> are ~ 60M and columns are 10M say with billion data points... >> >> I have another version that's around 60M and ~ 10K... >> >> I guess for the second one both all pair and dimsum will run >> fine... >> >> But for tall and wide, what do you suggest ? can dimsum handle it >> ? >> >> I might need jaccard as well...can I plug that in the PR ? >> >> >> >> On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh >> wrote: >> >>> You might want to wait until Wednesday since the interface will >>> be changing in that PR before Wednesday, probably over the weekend, >>> so that >>> you don't have to redo your code. Your call if you need it before a >>> week. >>> Reza >>> >>> >>> On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das < >>> debasish.da...@gmail.com> wrote: >>> Ohh
Re: Accuracy hit in classification with Spark
If you are using the Mahout's Multinomial Naive Bayes, it should be the same as MLlib's. I tried MLlib with news20.scale downloaded from http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass.html and the test accuracy is 82.4%. -Xiangrui On Tue, Sep 9, 2014 at 4:58 AM, jatinpreet wrote: > Hi, > > I tried running the classification program on the famous newsgroup data. > This had an even more drastic effect on the accuracy, as it dropped from > ~82% in Mahout to ~72% in Spark MLlib. > > Please help me in this regard as I have to use Spark in a production system > very soon and this is a blocker for me. > > Thanks, > Jatin > > > > - > Novice Big Data Programmer > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p13793.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problem in running mosek in spark cluster - java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738)
We have a small apache spark cluster of 6 computers. We are trying to solve a distributed problem which requires solving a optimization problem at each machine during a spark map operation. We decided to use mosek as the solver and I collected an academic license to this end. We observed that mosek works fine in a single system. However, when we prepare a jar file, include the mosek.jar into the library and try to run the jar in the cluster as a spark job it gives errors. java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path Does this problem has any thing to do with the license? We have set the necessary path variables i n the profile of the user in the master machine but we are not sure about what changes needs to be made to the other machines in the cluster. We shall be greatly obliged if you please suggest the necessary solution and help us out. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-running-mosek-in-spark-cluster-java-lang-UnsatisfiedLinkError-no-mosekjava7-0-in-java-lib-tp13799.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Huge matrix
Cool...can I add loadRowMatrix in your PR ? Thanks. Deb On Tue, Sep 9, 2014 at 1:14 AM, Reza Zadeh wrote: > Hi Deb, > > Did you mean to message me instead of Xiangrui? > > For TS matrices, dimsum with positiveinfinity and computeGramian have the > same cost, so you can do either one. For dense matrices with say, 1m > columns this won't be computationally feasible and you'll want to start > sampling with dimsum. > > It would be helpful to have a loadRowMatrix function, I would use it. > > Best, > Reza > > On Tue, Sep 9, 2014 at 12:05 AM, Debasish Das > wrote: > >> Hi Xiangrui, >> >> For tall skinny matrices, if I can pass a similarityMeasure to >> computeGrammian, I could re-use the SVD's computeGrammian for similarity >> computation as well... >> >> Do you recommend using this approach for tall skinny matrices or just use >> the dimsum's routines ? >> >> Right now RowMatrix does not have a loadRowMatrix function like the one >> available in LabeledPoint...should I add one ? I want to export the matrix >> out from my stable code and then test dimsum... >> >> Thanks. >> Deb >> >> >> >> On Fri, Sep 5, 2014 at 9:43 PM, Reza Zadeh wrote: >> >>> I will add dice, overlap, and jaccard similarity in a future PR, >>> probably still for 1.2 >>> >>> >>> On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das >>> wrote: >>> Awesome...Let me try it out... Any plans of putting other similarity measures in future (jaccard is something that will be useful) ? I guess it makes sense to add some similarity measures in mllib... On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh wrote: > Yes you're right, calling dimsum with gamma as PositiveInfinity turns > it into the usual brute force algorithm for cosine similarity, there is no > sampling. This is by design. > > > On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das > wrote: > >> I looked at the code: similarColumns(Double.posInf) is generating the >> brute force... >> >> Basically dimsum with gamma as PositiveInfinity will produce the >> exact same result as doing catesian products of RDD[(product, vector)] >> and >> computing similarities or there will be some approximation ? >> >> Sorry I have not read your paper yet. Will read it over the weekend. >> >> >> >> On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh >> wrote: >> >>> For 60M x 10K brute force and dimsum thresholding should be fine. >>> >>> For 60M x 10M probably brute force won't work depending on the >>> cluster's power, and dimsum thresholding should work with appropriate >>> threshold. >>> >>> Dimensionality reduction should help, and how effective it is will >>> depend on your application and domain, it's worth trying if the direct >>> computation doesn't work. >>> >>> You can also try running KMeans clustering (perhaps after >>> dimensionality reduction) if your goal is to find batches of similar >>> points >>> instead of all pairs above a threshold. >>> >>> >>> >>> >>> On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das < >>> debasish.da...@gmail.com> wrote: >>> Also for tall and wide (rows ~60M, columns 10M), I am considering running a matrix factorization to reduce the dimension to say ~60M x 50 and then run all pair similarity... Did you also try similar ideas and saw positive results ? On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das < debasish.da...@gmail.com> wrote: > Ok...just to make sure I have RowMatrix[SparseVector] where rows > are ~ 60M and columns are 10M say with billion data points... > > I have another version that's around 60M and ~ 10K... > > I guess for the second one both all pair and dimsum will run > fine... > > But for tall and wide, what do you suggest ? can dimsum handle it ? > > I might need jaccard as well...can I plug that in the PR ? > > > > On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh > wrote: > >> You might want to wait until Wednesday since the interface will >> be changing in that PR before Wednesday, probably over the weekend, >> so that >> you don't have to redo your code. Your call if you need it before a >> week. >> Reza >> >> >> On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das < >> debasish.da...@gmail.com> wrote: >> >>> Ohh coolall-pairs brute force is also part of this PR ? Let >>> me pull it in and test on our dataset... >>> >>> Thanks. >>> Deb >>> >>> >>> On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh >>> wrote: >>> Hi Deb, We are adding
Re: groupBy gives non deterministic results
Thank you for your replies. More details here: The prog is executed on local mode (single node). Default env params are used. The test code and the result are in this gist: https://gist.github.com/coderh/0147467f0b185462048c Here is 10 first lines of the data: 3 fields each row, the delimiter is ";" 3801959;11775022;118 3801960;14543202;118 3801984;11781380;20 3801984;13255417;20 3802003;11777557;91 3802055;11781159;26 3802076;11782793;102 3802086;17881551;102 3802087;19064728;99 3802105;12760994;99 ... There are 27 partitions(small files). Total size is about 100 Mb. We find that this problem is highly probably caused by the bug SPARK-2043: https://issues.apache.org/jira/browse/SPARK-2043 Could someone give more details on this bug ? The pull request say: The current implementation reads one key with the next hash code as it finishes reading the keys with the current hash code, which may cause it to miss some matches of the next key. This can cause operations like join to give the wrong result when reduce tasks spill to disk and there are hash collisions, as values won't be matched together. This PR fixes it by not reading in that next key, using a peeking iterator instead. I don't understand why reading a key with the next hash code will cause it to miss some matches of the next key. If someone could show me some code to dig in, it's highly appreciated. =) Thank you. Hao. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13797.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark functionality similar to hadoop's RecordWriter close method
You're mixing the Java and Scala APIs here. Your call to foreach() is expecting a Scala function and you're giving it a Java Function. Ideally you just use the Scala API, of course. Before explaining how to actually use a Java function here, maybe clarify that you have to do it and can't use Scala for some reason? since your declaration of the Java function also seems like it isn't intentional -- Unit and Seq are not Java types. There is no callback, and no direct analog of RecordWriter.close(). If you are writing a "foreach" function, then you want to use "foreachPartition", and then after writing a partition's worth of records, you could do something at the end of the function. This may or may not suit your purpose as it is not necessarily called in the same way that RecordWriter.close() was. But ideally you're not relying on that kind of thing anyway. On Tue, Sep 9, 2014 at 1:39 PM, robertberta wrote: > I want to call a function for batches of elements from an rdd > > val javaClass:org.apache.spark.api.java.function.Function[Seq[String],Unit] > = new JavaClass() > rdd.mapPartitions(_.grouped(5)).foreach(javaClass) > > 1.This worked fine in spark 0.9.1 , when we upgrade to spark 1.0.2 , > Function changed from class to interface and we get : > > type mismatch; > found : org.apache.spark.api.java.function.Function[Seq[String],Unit] > required: Seq[String] => Unit > > We are using Java 1.7 > We use that class for one time initialization method call on each executor > and for batch processing . > > 2. Previously on hadoop by RecordWriter.close() we get a callback method for > every executor that processed map/reduce operations. We would like this in > spark too , is it possible? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/spark-functionality-similar-to-hadoop-s-RecordWriter-close-method-tp13795.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark functionality similar to hadoop's RecordWriter close method
I want to call a function for batches of elements from an rdd val javaClass:org.apache.spark.api.java.function.Function[Seq[String],Unit] = new JavaClass() rdd.mapPartitions(_.grouped(5)).foreach(javaClass) 1.This worked fine in spark 0.9.1 , when we upgrade to spark 1.0.2 , Function changed from class to interface and we get : type mismatch; found : org.apache.spark.api.java.function.Function[Seq[String],Unit] required: Seq[String] => Unit We are using Java 1.7 We use that class for one time initialization method call on each executor and for batch processing . 2. Previously on hadoop by RecordWriter.close() we get a callback method for every executor that processed map/reduce operations. We would like this in spark too , is it possible? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-functionality-similar-to-hadoop-s-RecordWriter-close-method-tp13795.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accuracy hit in classification with Spark
Hi, I tried running the classification program on the famous newsgroup data. This had an even more drastic effect on the accuracy, as it dropped from ~82% in Mahout to ~72% in Spark MLlib. Please help me in this regard as I have to use Spark in a production system very soon and this is a blocker for me. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p13793.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accuracy hit in classification with Spark
Hi, I tried running the classification program on the famous newsgroup data. This had an even more drastic effect on the accuracy, as it dropped from ~82% in Mahout to ~72% in Spark MLlib. Please help me in this regard as I have to use Spark in a production system very soon and this is a blocker for me. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p13792.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Querying a parquet file in s3 with an ec2 install
>Why I think its the number of files is that I believe that a > all of those or large part of those files are read when >you run sqlContext.parquetFile() and the time it would >take in s3 for that to happen is a lot so something >internally is timing out.. I'll create the parquet files with Drill instead of Spark which will give me (somewhat) better control over the slice sizes and see what happens. That said, this behavior seems wrong to me. First, exiting due to inactivity on a job seems like (perhaps?) the wrong fix to a former problem. Second, there IS activity if it's reading the slice headers but the job is exiting anyway. So if this fixes the problem the measure of "activity" seems wrong. Ian and Manu, thanks for your help. I'll post back and let you know if that fixes it. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13791.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Querying a parquet file in s3 with an ec2 install
My apologies to the list. I replied to Manu's question and it went directly to him rather than the list. In case anyone else has this issue here is my reply and Manu's reply to me. This also answers Ian's question. --- Hi Manu, The dataset is 7.5 million rows and 500 columns. In parquet form it's about 1.1 Gig. It was created with Spark and copied up to s3. It has about 4600 parts (which I'd also like to gain some control over). I can try a smaller dataset, however it works when I run it locally, even with the file out on s3. It just takes a while. I can try copying it to HDFS first but that wont help longer term. Thanks Jim - Manu's response: - I am pretty sure it is due to the number of parts you have.. I have a parquet data set that is 250M rows and 924 columns and it is ~2500 files... I recommend creating a tables in HIve with that data set and doing an insert overwrite so you can get a data set with more manageable files.. Why I think its the number of files is that I believe that a all of those or large part of those files are read when you run sqlContext.parquetFile() and the time it would take in s3 for that to happen is a lot so something internally is timing out.. -Manu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter function problem
In order to help anyone to answer i could say that i checked the inactiveIDs.filter operation seperated, and I found that it doesn't return null in any case. In addition i don't how to handle (or check) whether a RDD is null. I find the debugging to complicated to point the error. Any ideas how to find the null pointer? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Filter-function-problem-tp13787p13789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Filter function problem
I have the following code written in scala in Spark: (inactiveIDs is a RDD[(Int, Seq[String])], persons is a Broadcast[RDD[(Int, Seq[Event])]] and Event is a class that I have created) val test = persons.value .map{tuple => (tuple._1, tuple._2 .filter{event => inactiveIDs.filter(event2 => event2._1 == tuple._1).count() != 0})} and the following error: java.lang.NullPointerException Any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Filter-function-problem-tp13787.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark streaming: size of DStream
yes, I agree to directly transform on DStream even there is no data injected in this batch duration. while my situation is : Spark receive flume stream continurously, and I use updateStateByKey function to collect data for a key among several batches, then I will handle the collected data after waiting a specified time (which I use a counter to measure) since the first time no data updated in the updateStateByKey operation. Normally, when the waiting time is up, I should collected all data for a key. But if the flume data source is broken for a while, and if this interval is over the waiting time, then I will only get partial data for a key. So I need a way to determine whether current flume stream batch contains data, if no, it means the flume data source is broken, then I can skip the updateStateByKey operation, till the flume data source is reconnected, then the counter in the updateStateByKey function can count again. In this way I could get the intack data. another question, why the count variable in map cannot work but it effects in the foreachRDD in my previous code? thanks :P -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13785.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark streaming: size of DStream
I think you should clarify some things in Spark Streaming: 1. closure in map is running in the remote side, so modify count var will only take effect in remote side. You will always get -1 in driver side. 2. some codes in closure in foreachRDD is lazily executed in each batch duration, while the if (...) code outside the closure is executed once immediately and will never executed again, so your code logic is wrong as expected. 3. I don't think you need to judge whether there is data feed in to do some transformations, you can directly transform on DStream even there is no data injected in this batch duration, it's only an empty transformation, no more specific overhead. Thanks Jerry -Original Message- From: julyfire [mailto:hellowe...@gmail.com] Sent: Tuesday, September 09, 2014 4:20 PM To: u...@spark.incubator.apache.org Subject: RE: Spark streaming: size of DStream i'm sorry I have some error in my code, update here: var count = -1L // a global variable in the main object val currentBatch = some_DStream val countDStream = currentBatch.map(o=>{ count = 0L // reset the count variable in each batch o }) countDStream.foreachRDD(rdd=> count += rdd.count()) if (count > 0) { currentBatch.map(...).someOtherTransformation } two problems: 1. the variable count just go on accumulate and no reset in each batch 2. if(count > 0) only evaluate in the beginning of running the program, so the next statement will never run Can you all give me some suggestion? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming: size of DStream
If you take into account what streaming means in spark, your goal doesn't really make sense; you have to assume that your streams are infinite and you will have to process them till the end of the days. Operations on a DStream define what you want to do with each element of each RDD, but spark streaming is smart enough to not apply the transformations if RDD are empty. The only time where you probably want to know the size of the RDD is when you are going to perform a side-effect like storing something in a database, using foreachRDD, i.e: val flumeStream = ... val transformedStream = flumeStream.map(... some transformation ...).flatMap(... some other transformation).distinct(). transformedStream.foreachRDD { rdd => if (rdd.count() != 0) { // perform some side effect that shouldn't be done if a transformed batch is empty } } 2014-09-09 9:20 GMT+01:00 julyfire : > i'm sorry I have some error in my code, update here: > > var count = -1L // a global variable in the main object > > val currentBatch = some_DStream > val countDStream = currentBatch.map(o=>{ > count = 0L // reset the count variable in each batch > o > }) > countDStream.foreachRDD(rdd=> count += rdd.count()) > > if (count > 0) { > currentBatch.map(...).someOtherTransformation > } > > two problems: > 1. the variable count just go on accumulate and no reset in each batch > 2. if(count > 0) only evaluate in the beginning of running the program, so > the next statement will never run > > Can you all give me some suggestion? thanks > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: groupBy gives non deterministic results
Can you provide small sample or test data that reproduce this problem? and what's your env setup? single node or cluster? Sent from my iPhone > On 2014年9月8日, at 22:29, redocpot wrote: > > Hi, > > I have a key-value RDD called rdd below. After a groupBy, I tried to count > rows. > But the result is not unique, somehow non deterministic. > > Here is the test code: > > val step1 = ligneReceipt_cleTable.persist > val step2 = step1.groupByKey > > val s1size = step1.count > val s2size = step2.count > > val t = step2 // rdd after groupBy > > val t1 = t.count > val t2 = t.count > val t3 = t.count > val t4 = t.count > val t5 = t.count > val t6 = t.count > val t7 = t.count > val t8 = t.count > > println("s1size = " + s1size) > println("s2size = " + s2size) > println("1 => " + t1) > println("2 => " + t2) > println("3 => " + t3) > println("4 => " + t4) > println("5 => " + t5) > println("6 => " + t6) > println("7 => " + t7) > println("8 => " + t8) > > Here are the results: > > s1size = 5338864 > s2size = 5268001 > 1 => 5268002 > 2 => 5268001 > 3 => 5268001 > 4 => 5268002 > 5 => 5268001 > 6 => 5268002 > 7 => 5268002 > 8 => 5268001 > > Even if the difference is just one row, that's annoying. > > Any idea ? > > Thank you. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark streaming: size of DStream
i'm sorry I have some error in my code, update here: var count = -1L // a global variable in the main object val currentBatch = some_DStream val countDStream = currentBatch.map(o=>{ count = 0L // reset the count variable in each batch o }) countDStream.foreachRDD(rdd=> count += rdd.count()) if (count > 0) { currentBatch.map(...).someOtherTransformation } two problems: 1. the variable count just go on accumulate and no reset in each batch 2. if(count > 0) only evaluate in the beginning of running the program, so the next statement will never run Can you all give me some suggestion? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Huge matrix
Hi Deb, Did you mean to message me instead of Xiangrui? For TS matrices, dimsum with positiveinfinity and computeGramian have the same cost, so you can do either one. For dense matrices with say, 1m columns this won't be computationally feasible and you'll want to start sampling with dimsum. It would be helpful to have a loadRowMatrix function, I would use it. Best, Reza On Tue, Sep 9, 2014 at 12:05 AM, Debasish Das wrote: > Hi Xiangrui, > > For tall skinny matrices, if I can pass a similarityMeasure to > computeGrammian, I could re-use the SVD's computeGrammian for similarity > computation as well... > > Do you recommend using this approach for tall skinny matrices or just use > the dimsum's routines ? > > Right now RowMatrix does not have a loadRowMatrix function like the one > available in LabeledPoint...should I add one ? I want to export the matrix > out from my stable code and then test dimsum... > > Thanks. > Deb > > > > On Fri, Sep 5, 2014 at 9:43 PM, Reza Zadeh wrote: > >> I will add dice, overlap, and jaccard similarity in a future PR, probably >> still for 1.2 >> >> >> On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das >> wrote: >> >>> Awesome...Let me try it out... >>> >>> Any plans of putting other similarity measures in future (jaccard is >>> something that will be useful) ? I guess it makes sense to add some >>> similarity measures in mllib... >>> >>> >>> On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh wrote: >>> Yes you're right, calling dimsum with gamma as PositiveInfinity turns it into the usual brute force algorithm for cosine similarity, there is no sampling. This is by design. On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das wrote: > I looked at the code: similarColumns(Double.posInf) is generating the > brute force... > > Basically dimsum with gamma as PositiveInfinity will produce the exact > same result as doing catesian products of RDD[(product, vector)] and > computing similarities or there will be some approximation ? > > Sorry I have not read your paper yet. Will read it over the weekend. > > > > On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh > wrote: > >> For 60M x 10K brute force and dimsum thresholding should be fine. >> >> For 60M x 10M probably brute force won't work depending on the >> cluster's power, and dimsum thresholding should work with appropriate >> threshold. >> >> Dimensionality reduction should help, and how effective it is will >> depend on your application and domain, it's worth trying if the direct >> computation doesn't work. >> >> You can also try running KMeans clustering (perhaps after >> dimensionality reduction) if your goal is to find batches of similar >> points >> instead of all pairs above a threshold. >> >> >> >> >> On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das < >> debasish.da...@gmail.com> wrote: >> >>> Also for tall and wide (rows ~60M, columns 10M), I am considering >>> running a matrix factorization to reduce the dimension to say ~60M x 50 >>> and >>> then run all pair similarity... >>> >>> Did you also try similar ideas and saw positive results ? >>> >>> >>> >>> On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das < >>> debasish.da...@gmail.com> wrote: >>> Ok...just to make sure I have RowMatrix[SparseVector] where rows are ~ 60M and columns are 10M say with billion data points... I have another version that's around 60M and ~ 10K... I guess for the second one both all pair and dimsum will run fine... But for tall and wide, what do you suggest ? can dimsum handle it ? I might need jaccard as well...can I plug that in the PR ? On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh wrote: > You might want to wait until Wednesday since the interface will be > changing in that PR before Wednesday, probably over the weekend, so > that > you don't have to redo your code. Your call if you need it before a > week. > Reza > > > On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das < > debasish.da...@gmail.com> wrote: > >> Ohh coolall-pairs brute force is also part of this PR ? Let >> me pull it in and test on our dataset... >> >> Thanks. >> Deb >> >> >> On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh >> wrote: >> >>> Hi Deb, >>> >>> We are adding all-pairs and thresholded all-pairs via dimsum in >>> this PR: https://github.com/apache/spark/pull/1778 >>> >>> Your question wasn't entirely clear - does this answer it? >>> >>> Best, >>> Reza >>> >>> >>> On Fri,
Re: Spark driver application can not connect to Spark-Master
Hi, I had the same issue in my Java code while I was trying to connect to a locally hosted spark server (using sbin/start-all.sh etc) using an IDE (IntelliJ). I packaged my app into a jar and used spark-submit (in bin/) and it worked! Hope this helps Rgds -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-driver-application-can-not-connect-to-Spark-Master-tp13226p13779.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark streaming: size of DStream
Thanks all, yes, i did using foreachRDD, the following is my code: var count = -1L // a global variable in the main object val currentBatch = some_DStream val countDStream = currentBatch.map(o=>{ *count = 0L *// reset the count variable in each batch o }) countDStream.foreachRDD(rdd=>{println(s); s += rdd.count()}) the variable count stores the number of records of each batch, but it can't be reset to 0. I mean this statement *count = 0L *does not work. is my code right? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13778.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark streaming: size of DStream
Hi, I think all the received stream will generate a RDD in each batch duration even there is no data feed in (an empty RDD will be generated). So you cannot use number of RDD to judge whether there is any data received. One way is to do this in DStream/foreachRDD(), like a.foreachRDD { r => if (r.count() == 0) { do something } else { do some other things. } } You can try it. Thanks Jerry -Original Message- From: julyfire [mailto:hellowe...@gmail.com] Sent: Tuesday, September 09, 2014 3:42 PM To: u...@spark.incubator.apache.org Subject: RE: Spark streaming: size of DStream Hi Jerry, Thanks for your reply. I use spark streaming to receive the flume stream, then I need to do a judgement, in each batchDuration, if the received stream has data, then I should do something, if no data, do the other thing. Then I thought the count() can give me the measure, but it returns a DStream, not a number. so is there a way to achieve this case? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming: size of DStream
How about calling foreachRDD, and processing whatever data is in each RDD normally, and also keeping track within the foreachRDD function of whether any RDD had a count() > 0? if not, then you can execute at the end your alternate logic in the case of no data. I don't think you want to operate at the DStream level. On Tue, Sep 9, 2014 at 8:41 AM, julyfire wrote: > Hi Jerry, > > Thanks for your reply. > I use spark streaming to receive the flume stream, then I need to do a > judgement, in each batchDuration, if the received stream has data, then I > should do something, if no data, do the other thing. Then I thought the > count() can give me the measure, but it returns a DStream, not a number. > so is there a way to achieve this case? > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark streaming: size of DStream
Hi Jerry, Thanks for your reply. I use spark streaming to receive the flume stream, then I need to do a judgement, in each batchDuration, if the received stream has data, then I should do something, if no data, do the other thing. Then I thought the count() can give me the measure, but it returns a DStream, not a number. so is there a way to achieve this case? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark streaming: size of DStream
Hi, Is there any specific scenario which needs to know the RDD numbers in the DStream? According to my knowledge DStream will generate one RDD in each right batchDuration, some old rdd will be remembered for windowing-like function, and will be removed when useless. The hashmap generatedRDDs in DStream.scala contains the rdd as you wanted, though you cannot call it from app. Besides the count() API returns the records number of this DStream's each RDD, not the number of RDD, the number of RDD should always be 1 as I understand. Thanks Jerry -Original Message- From: julyfire [mailto:hellowe...@gmail.com] Sent: Tuesday, September 09, 2014 2:42 PM To: u...@spark.incubator.apache.org Subject: Spark streaming: size of DStream I want to implement the following logic: val stream = getFlumeStream() // a DStream if(size_of_stream > 0) // if the DStream contains some RDD stream.someTransfromation stream.count() can figure out the number of RDD in a DStream, but it return a DStream[Long] and can't compare with a number. does anyone know how to get the number of RDD in a DStream? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Accuracy hit in classification with Spark
Hi, I had been using Mahout's Naive Bayes algorithm to classify document data. For a specific train and test set, I was getting accuracy in the range of 86%. When I shifted to Spark's MLlib, the accuracy dropped to the vicinity of 82%. I am using same version of Lucene and logic to generate TFIDF vectors. I tried fiddling with the smoothing parameter but to no avail. My question is if the underlying algorithm is same in both Mahout and MLlib, why this accuracy dip is being observed? - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupBy gives non deterministic results
What's the type of the key? If the hash of key is different across slaves, then you could get this confusing results. We had met this similar results in Python, because of hash of None is different across machines. Davies On Mon, Sep 8, 2014 at 8:16 AM, redocpot wrote: > Update: > > Just test with HashPartitioner(8) and count on each partition: > > List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), > *(5,657591*), (*6,658327*), (*7,658434*)), > List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), > *(5,657594)*, (6,658326), (*7,658434*)), > List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), > *(5,657592)*, (6,658326), (*7,658435*)), > List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), > *(5,657591)*, (6,658326), (7,658434)), > List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), > *(5,657592)*, (6,658326), (7,658435)), > List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), > *(5,657592)*, (6,658326), (7,658435)), > List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), > *(5,657592)*, (6,658326), (7,658435)), > List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), > *(5,657591)*, (6,658326), (7,658435)) > > The result is not identical for each execution. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13702.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Huge matrix
Hi Xiangrui, For tall skinny matrices, if I can pass a similarityMeasure to computeGrammian, I could re-use the SVD's computeGrammian for similarity computation as well... Do you recommend using this approach for tall skinny matrices or just use the dimsum's routines ? Right now RowMatrix does not have a loadRowMatrix function like the one available in LabeledPoint...should I add one ? I want to export the matrix out from my stable code and then test dimsum... Thanks. Deb On Fri, Sep 5, 2014 at 9:43 PM, Reza Zadeh wrote: > I will add dice, overlap, and jaccard similarity in a future PR, probably > still for 1.2 > > > On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das > wrote: > >> Awesome...Let me try it out... >> >> Any plans of putting other similarity measures in future (jaccard is >> something that will be useful) ? I guess it makes sense to add some >> similarity measures in mllib... >> >> >> On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh wrote: >> >>> Yes you're right, calling dimsum with gamma as PositiveInfinity turns it >>> into the usual brute force algorithm for cosine similarity, there is no >>> sampling. This is by design. >>> >>> >>> On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das >>> wrote: >>> I looked at the code: similarColumns(Double.posInf) is generating the brute force... Basically dimsum with gamma as PositiveInfinity will produce the exact same result as doing catesian products of RDD[(product, vector)] and computing similarities or there will be some approximation ? Sorry I have not read your paper yet. Will read it over the weekend. On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh wrote: > For 60M x 10K brute force and dimsum thresholding should be fine. > > For 60M x 10M probably brute force won't work depending on the > cluster's power, and dimsum thresholding should work with appropriate > threshold. > > Dimensionality reduction should help, and how effective it is will > depend on your application and domain, it's worth trying if the direct > computation doesn't work. > > You can also try running KMeans clustering (perhaps after > dimensionality reduction) if your goal is to find batches of similar > points > instead of all pairs above a threshold. > > > > > On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das > wrote: > >> Also for tall and wide (rows ~60M, columns 10M), I am considering >> running a matrix factorization to reduce the dimension to say ~60M x 50 >> and >> then run all pair similarity... >> >> Did you also try similar ideas and saw positive results ? >> >> >> >> On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das < >> debasish.da...@gmail.com> wrote: >> >>> Ok...just to make sure I have RowMatrix[SparseVector] where rows are >>> ~ 60M and columns are 10M say with billion data points... >>> >>> I have another version that's around 60M and ~ 10K... >>> >>> I guess for the second one both all pair and dimsum will run fine... >>> >>> But for tall and wide, what do you suggest ? can dimsum handle it ? >>> >>> I might need jaccard as well...can I plug that in the PR ? >>> >>> >>> >>> On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh >>> wrote: >>> You might want to wait until Wednesday since the interface will be changing in that PR before Wednesday, probably over the weekend, so that you don't have to redo your code. Your call if you need it before a week. Reza On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das < debasish.da...@gmail.com> wrote: > Ohh coolall-pairs brute force is also part of this PR ? Let me > pull it in and test on our dataset... > > Thanks. > Deb > > > On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh > wrote: > >> Hi Deb, >> >> We are adding all-pairs and thresholded all-pairs via dimsum in >> this PR: https://github.com/apache/spark/pull/1778 >> >> Your question wasn't entirely clear - does this answer it? >> >> Best, >> Reza >> >> >> On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das < >> debasish.da...@gmail.com> wrote: >> >>> Hi Reza, >>> >>> Have you compared with the brute force algorithm for similarity >>> computation with something like the following in Spark ? >>> >>> https://github.com/echen/scaldingale >>> >>> I am adding cosine similarity computation but I do want to >>> compute an all pair similarities... >>> >>> Note that the data is sparse for me (the data that goes to >>> matrix factorization) so I don't think joining and group-by