Re: Map-Side Join in Spark
If you are using a pairrdd, then you can use partition by method to provide your partitioner On 21 Apr 2015 15:04, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: What is re-partition ? On Tue, Apr 21, 2015 at 10:23 AM, ayan guha guha.a...@gmail.com wrote: In my understanding you need to create a key out of the data and repartition both datasets to achieve map side join. On 21 Apr 2015 14:10, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Can someone share their working code of Map Side join in Spark + Scala. (No Spark-SQL) The only resource i could find was this (Open in chrome with Chinese to english translator) http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ -- Deepak -- Deepak
Re: Spark and accumulo
You can simply use a custom inputformat (AccumuloInputFormat) with the hadoop RDDs (sc.newApiHadoopFile etc) for that, all you need to do is to pass the jobConfs. Here's pretty clean discussion: http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook#answers-header Thanks Best Regards On Tue, Apr 21, 2015 at 9:55 AM, madhvi madhvi.gu...@orkash.com wrote: Hi all, Is there anything to integrate spark with accumulo or make spark to process over accumulo data? Thanks Madhvi Gupta - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
WebUI shows poor locality when task schduling
Hi, When running a exprimental KMeans job for expriment, the Cached RDD is original Points data. I saw poor locality in Task details from WebUI. Almost one half of the input of task is Network instead of Memory. And Task with network input consumes almost the same time compare with the task with Hadoop(Disk) input, and twice with task(Memory input). e.g Task(Memory): 16s Task(Network): 9s Task(Hadoop): 9s I see fectching RDD with 30MB form remote node consumes 5 seconds in executor logs like below: 15/03/31 04:08:52 INFO CoarseGrainedExecutorBackend: Got assigned task 58 15/03/31 04:08:52 INFO Executor: Running task 15.0 in stage 1.0 (TID 58) 15/03/31 04:08:52 INFO HadoopRDD: Input split: hdfs://master:8000/kmeans/data-Kmeans-5.3g:2013265920+134217728 15/03/31 04:08:52 INFO BlockManager: Found block rdd_3_15 locally 15/03/31 04:08:58 INFO Executor: Finished task 15.0 in stage 1.0 (TID 58). 1920 bytes result sent to driver 15/03/31 04:08:58 INFO CoarseGrainedExecutorBackend: Got assigned task 60 -Task60 15/03/31 04:08:58 INFO Executor: Running task 17.0 in stage 1.0 (TID 60) 15/03/31 04:08:58 INFO HadoopRDD: Input split: hdfs://master:8000/kmeans/data-Kmeans-5.3g:2281701376+134217728 15/03/31 04:09:02 INFO BlockManager: Found block rdd_3_17 remotely 15/03/31 04:09:12 INFO Executor: Finished task 17.0 in stage 1.0 (TID 60). 1920 bytes result sent to driver So 1)is that means i should use RDD with cache(MEMORY_AND_DISK) instead of Memory only? 2)And should i expand Network capacity or turn Schduling locality parameter? Any suggestion will be welcome. --Env info--- Cluster: 4 worker, with 1 Cores and 2G executor memory Spark version: 1.1.0 Network: 30MB/s Submit shell: bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans --master spark://master:7077 --executor-memory 1g lib/spark-examples-1.1.0-hadoop2.3.0.jar hdfs://master:8000/kmeans/data-Kmeans-7g 8 1 Thanks very much and forgive for my poor English. -- Wang Haihua
Re: how to make a spark cluster ?
Actually if you only have one machine, just use the Spark local mode. Just download the Spark tarball, untar it, set master to local[N], where N = number of cores. You are good to go. There is no setup of job tracker or Hadoop. On Mon, Apr 20, 2015 at 3:21 PM, haihar nahak harihar1...@gmail.com wrote: Thank you :) On Mon, Apr 20, 2015 at 4:46 PM, Jörn Franke jornfra...@gmail.com wrote: Hi, If you have just one physical machine then I would try out Docker instead of a full VM (would be waste of memory and CPU). Best regards Le 20 avr. 2015 00:11, hnahak harihar1...@gmail.com a écrit : Hi All, I've big physical machine with 16 CPUs , 256 GB RAM, 20 TB Hard disk. I just need to know what should be the best solution to make a spark cluster? If I need to process TBs of data then 1. Only one machine, which contain driver, executor, job tracker and task tracker everything. 2. create 4 VMs and each VM should consist 4 CPUs , 64 GB RAM 3. create 8 VMs and each VM should consist 2 CPUs , 32 GB RAM each please give me your views/suggestions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-spark-cluster-tp22563.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- {{{H2N}}}-(@:
Re: Understanding the build params for spark with sbt.
With maven you could like: mvn -Dhadoop.version=2.3.0 -DskipTests clean package -pl core Thanks Best Regards On Mon, Apr 20, 2015 at 8:10 PM, Shiyao Ma i...@introo.me wrote: Hi. My usage is only about the spark core and hdfs, so no spark sql or mlib or other components invovled. I saw the hint on the http://spark.apache.org/docs/latest/building-spark.html, with a sample like: build/sbt -Pyarn -Phadoop-2.3 assembly. (what's the -P for?) Fundamentally, I'd like to let sbt only compile and package the core and the hadoop. Meanwhile, it would be appreciated if you could inform me what's the scala file that controls the logic of -Pyarn, so that I can dig into the build source and have a finer control. Thanks. -- 吾輩は猫である。ホームーページはhttp://introo.me。 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Map-Side Join in Spark
These are pair RDDs (itemId, item) (itemId, listing). What do you mean by re-partitioning of these RDDS ? Now what you mean by your partitioner Can you elaborate ? On Tue, Apr 21, 2015 at 11:18 AM, ayan guha guha.a...@gmail.com wrote: If you are using a pairrdd, then you can use partition by method to provide your partitioner On 21 Apr 2015 15:04, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: What is re-partition ? On Tue, Apr 21, 2015 at 10:23 AM, ayan guha guha.a...@gmail.com wrote: In my understanding you need to create a key out of the data and repartition both datasets to achieve map side join. On 21 Apr 2015 14:10, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Can someone share their working code of Map Side join in Spark + Scala. (No Spark-SQL) The only resource i could find was this (Open in chrome with Chinese to english translator) http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ -- Deepak -- Deepak -- Deepak
Re: meet weird exception when studying rdd caching
It could be a similar issue as https://issues.apache.org/jira/browse/SPARK-4300 Thanks Best Regards On Tue, Apr 21, 2015 at 8:09 AM, donhoff_h 165612...@qq.com wrote: Hi, I am studying the RDD Caching function and write a small program to verify it. I run the program in a Spark1.3.0 environment and on Yarn cluster. But I meet a weird exception. It isn't always generated in the log. Only sometimes I can see this exception. And it does not affect the output of my program. Could anyone explain why this happens and how to eliminate it? My program and the exception is listed in the following. Thanks very much for the help! *Program* object TestSparkCaching01 { def main(args: Array[String]) { val conf = new SparkConf() conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrationRequired,true) conf.registerKryoClasses(Array(classOf[MyClass1],classOf[Array[MyClass1]])) val inFile = hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt val sc = new SparkContext(conf) val rdd = sc.textFile(inFile) rdd.cache() rdd.map(Cache String: +_).foreach(println ) sc.stop() } } *Exception* 15/04/21 09:58:25 WARN channel.DefaultChannelPipeline: An exception was thrown by an exception handler. java.util.concurrent.RejectedExecutionException: Worker has already been shutdown at org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72) at org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56) at org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) at org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34) at org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496) at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54) at org.jboss.netty.channel.Channels.disconnect(Channels.java:781) at org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211) at akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223) at akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222) at scala.util.Success.foreach(Try.scala:205) at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204) at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/04/21 09:58:25 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
Re: Custom paritioning of DSTream
I think DStream.transform is the one that you are looking for. Thanks Best Regards On Mon, Apr 20, 2015 at 9:42 PM, Evo Eftimov evo.efti...@isecc.com wrote: Is the only way to implement a custom partitioning of DStream via the foreach approach so to gain access to the actual RDDs comprising the DSTReam and hence their paritionBy method DSTReam has only a repartition method accepting only the number of partitions, BUT not the method of partitioning -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuring logging properties for executor
Hi, I would like to report on trying the first option proposed by Lan - putting the log4j.properties file under the root of my application jar. It doesn't look like it is working on in my case: submitting the application to spark from the application code (not with spark-submit). It seems that in this case the executor doesn't see the log4j.properties that is located int he application jar and will use the default properties file. I can conclude it from the fact that the log is not created, and that's what I see in the executor console: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties I've also tried to add the below config to the application code, which didn't have any apparent influence: sparkConf.set(spark.executor.extraJavaOptions, -Dlog4j.configuration=log4j.properties); On Mon, Apr 20, 2015 at 5:59 PM, Lan Jiang ljia...@gmail.com wrote: Rename your log4j_special.properties file as log4j.properties and place it under the root of your jar file, you should be fine. If you are using Maven to build your jar, please the log4j.properties in the src/main/resources folder. However, please note that if you have other dependency jar file in the classpath that contains another log4j.properties file this way, it might not work since the first log4j.properties file that is loaded will be used. You can also do spark-submit —file log4j_special.properties … ,which should transfer your log4j property file to the worker nodes automatically without you copying them manually. Lan On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev michael...@gmail.com wrote: Hi all, I need to configure spark executor log4j.properties on a standalone cluster. It looks like placing the relevant properties file in the spark configuration folder and setting the spark.executor.extraJavaOptions from my application code: sparkConf.set(spark.executor.extraJavaOptions, -Dlog4j.configuration=log4j_special.properties); does the work, and the executor logs are written in the required place and level. As far as I understand, it works, because the spark configuration folder is on the class path, and passing parameter without path works here. However, I would like to avoid deploying these properties to each worker spark configuration folder. I wonder, if I put the properties in my application jar, is there any way of telling executor to load them? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Scala Version?
Without the rest of your code it's hard to make sense of errors. Why do you need to use reflection? Make sure you use the same Scala versions throughout and 2.10.4 is recommended. That's still the official version for Spark, even though provisional support for 2.11 exists. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 21, 2015 at 12:26 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: While running a my Spark Application over 1.3.0 with Scala 2.10.0 i encountered 15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage 2.0 (TID 28) java.lang.UnsupportedOperationException: tail of empty list at scala.collection.immutable.Nil$.tail(List.scala:339) at scala.collection.immutable.Nil$.tail(List.scala:334) at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172) My app has quite a lot of reflection usage as i need to build avro schema. I was told on stack overflow that this error is because reflection is not thread safe in scala 2.10 and was advised to use scala 2.11 Hence I moved to scala 2.11 and moved all spark maven dependencies to 2.11 (spark-avro is still 2.10 as 2.11 is not available). I ran into this 15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage 2.0 (TID 28) java.lang.UnsupportedOperationException: tail of empty list at scala.collection.immutable.Nil$.tail(List.scala:339) at scala.collection.immutable.Nil$.tail(List.scala:334) at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172) Code: args.map { option = { val pairs = option.split(=) arguments += pairs(0) - pairs(1) } } Error is at -. Does Spark use scala libs 2.10 at runtime ? Any suggestions for fix ? -- Deepak
Re: Error while running SparkPi in Hadoop HA
Solved Looks like it's some incompatibility in the build when using -Phadoop-2.4 , made the distribution with -Phadoop-provided and that fixed the issue On Tue, Apr 21, 2015 at 2:03 PM, Fernando O. fot...@gmail.com wrote: Hi all, I'm wondering if SparkPi works with hadoop HA (I guess it should) Hadoop's pi example works great on my cluster, so after having that done I installed spark and in the worker log I'm seeing two problems that might be related. Versions: Hadoop 2.6.0 Spark 1.3.1 I'm running : ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 2 --driver-memory 2g --executor-memory 1g --executor-cores 1 lib/spark-examples*.jar 10 It seems like it can't see the resource manager but I pinged and telnet it from the node and that works, also: hadoop's pi example works... So I'm not sure why spark is not seeing the RM 15/04/21 15:56:21 INFO yarn.YarnRMClient: Registering the ApplicationMaster*15/04/21 15:56:22 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 http://0.0.0.0/0.0.0.0:8030. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:23 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 http://0.0.0.0/0.0.0.0:8030. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)* 15/04/21 15:56:24 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:25 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:26 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:27 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:28 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:29 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:30 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 8 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:31 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:51 INFO cluster.YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) 15/04/21 15:56:51 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/04/21 15:56:51 INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:35 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Got job 0 (reduce at SparkPi.scala:35) with 10 output partitions (allowLocal=false) 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce at SparkPi.scala:35) 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Missing parents: List() 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:31), which has no missing parents 15/04/21 15:56:51 INFO cluster.YarnClusterScheduler: Cancelling stage 0 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Stage 0 (reduce at SparkPi.scala:35) failed in Unknown s 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Job 0 failed: reduce at SparkPi.scala:35, took 0.121974 s*15/04/21 15:56:51 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
Re: SparkSQL performance
Here is an example using rows directly: https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#programmatically-specifying-the-schema Avro or parquet input would likely give you the best performance. On Tue, Apr 21, 2015 at 4:28 AM, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: Thanks for the hints guys! much appreciated! Even if I just do a something like: Select * from tableX where attribute1 5 I see similar behaviour. @Michael Could you point me to any sample code that uses Spark's Rows? We are at a phase where we can actually change our JavaBeans for something that provides a better performance than what we are seeing now. Would you recommend using Avro presentation then? Thanks again! Renato M. 2015-04-21 1:18 GMT+02:00 Michael Armbrust mich...@databricks.com: There is a cost to converting from JavaBeans to Rows and this code path has not been optimized. That is likely what you are seeing. On Mon, Apr 20, 2015 at 3:55 PM, ayan guha guha.a...@gmail.com wrote: SparkSQL optimizes better by column pruning and predicate pushdown, primarily. Here you are not taking advantage of either. I am curious to know what goes in your filter function, as you are not using a filter in SQL side. Best Ayan On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: Does anybody have an idea? a clue? a hint? Thanks! Renato M. 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com: Hi all, I have a simple query Select * from tableX where attribute1 between 0 and 5 that I run over a Kryo file with four partitions that ends up being around 3.5 million rows in our case. If I run this query by doing a simple map().filter() it takes around ~9.6 seconds but when I apply schema, register the table into a SqlContext, and then run the query, it takes around ~16 seconds. This is using Spark 1.2.1 with Scala 2.10.0 I am wondering why there is such a big gap on performance if it is just a filter. Internally, the relation files are mapped to a JavaBean. This different data presentation (JavaBeans vs SparkSQL internal representation) could lead to such difference? Is there anything I could do to make the performance get closer to the hard-coded option? Thanks in advance for any suggestions or ideas. Renato M.
Spark Scala Version?
While running a my Spark Application over 1.3.0 with Scala 2.10.0 i encountered 15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage 2.0 (TID 28) java.lang.UnsupportedOperationException: tail of empty list at scala.collection.immutable.Nil$.tail(List.scala:339) at scala.collection.immutable.Nil$.tail(List.scala:334) at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172) My app has quite a lot of reflection usage as i need to build avro schema. I was told on stack overflow that this error is because reflection is not thread safe in scala 2.10 and was advised to use scala 2.11 Hence I moved to scala 2.11 and moved all spark maven dependencies to 2.11 (spark-avro is still 2.10 as 2.11 is not available). I ran into this 15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage 2.0 (TID 28) java.lang.UnsupportedOperationException: tail of empty list at scala.collection.immutable.Nil$.tail(List.scala:339) at scala.collection.immutable.Nil$.tail(List.scala:334) at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172) Code: args.map { option = { val pairs = option.split(=) arguments += pairs(0) - pairs(1) } } Error is at -. Does Spark use scala libs 2.10 at runtime ? Any suggestions for fix ? -- Deepak
Re: Updating a Column in a DataFrame
You can use df.withColumn(a, df.b) to make column a having the same value as column b. On Mon, Apr 20, 2015 at 3:38 PM, ARose ashley.r...@telarix.com wrote: In my Java application, I want to update the values of a Column in a given DataFrame. However, I realize DataFrames are immutable, and therefore cannot be updated by conventional means. Is there a workaround for this sort of transformation? If so, can someone provide an example? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Updating-a-Column-in-a-DataFrame-tp22578.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.1 Dataframe breaking ALS.train?
SchemaRDD subclasses RDD in 1.2, but DataFrame is no longer an RDD in 1.3. We should allow DataFrames in ALS.train. I will submit a patch. You can use `ALS.train(training.rdd, ...)` for now as a workaround. -Xiangrui On Tue, Apr 21, 2015 at 10:51 AM, Joseph Bradley jos...@databricks.com wrote: Hi Ayan, If you want to use DataFrame, then you should use the Pipelines API (org.apache.spark.ml.*) which will take DataFrames: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS In the examples/ directory for ml/, you can find a MovieLensALS example. Good luck! Joseph On Tue, Apr 21, 2015 at 4:58 AM, ayan guha guha.a...@gmail.com wrote: Hi I am getting an error Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD It was working fine in 1.2.0 (till last night :)) Any solution? I am thinking to map the training dataframe back to a RDD, byt will lose the schema information. Best Ayan On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com wrote: Hi Just upgraded to Spark 1.3.1. I am getting an warning Warning (from warnings module): File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py, line 191 warnings.warn(inferSchema is deprecated, please use createDataFrame instead) UserWarning: inferSchema is deprecated, please use createDataFrame instead However, documentation still says to use inferSchema. Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in section Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Rank:8 Lmbda:1.0 iteration:10 Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Join on DataFrames from the same source (Pyspark)
This is https://issues.apache.org/jira/browse/SPARK-6231 Unfortunately this is pretty hard to fix as its hard for us to differentiate these without aliases. However you can add an alias as follows: from pyspark.sql.functions import * df.alias(a).join(df.alias(b), col(a.col1) == col(b.col1)) On Tue, Apr 21, 2015 at 8:10 AM, Karlson ksonsp...@siberie.de wrote: Sorry, my code actually was df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') But in Spark 1.4.0 this does not seem to make any difference anyway and the problem is the same with both versions. On 2015-04-21 17:04, ayan guha wrote: your code should be df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') Your current code is generating a tupple, and of course df_1 and df_2 are different, so join is yielding to cartesian. Best Ayan On Wed, Apr 22, 2015 at 12:42 AM, Karlson ksonsp...@siberie.de wrote: Hi, can anyone confirm (and if so elaborate on) the following problem? When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example: I load a DataFrame with n rows from disk: df = sql_context.parquetFile('data.parquet') Then I create two DataFrames from that source. df_one = df.select(['col1', 'col2']) df_two = df.select(['col1', 'col3']) Finally I want to (inner) join them back together: df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner') The key in col1 is unique. The resulting DataFrame should have n rows, however it does have n*n rows. That does not happen, when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Column renaming after DataFrame.groupBy
You can use the more verbose syntax: d.groupBy(_1).agg(d(_1), sum(_1).as(sum_1), sum(_2).as(sum_2)) On Tue, Apr 21, 2015 at 1:06 AM, Justin Yip yipjus...@prediction.io wrote: Hello, I would like rename a column after aggregation. In the following code, the column name is SUM(_1#179), is there a way to rename it to a more friendly name? scala val d = sqlContext.createDataFrame(Seq((1, 2), (1, 3), (2, 10))) scala d.groupBy(_1).sum().printSchema root |-- _1: integer (nullable = false) |-- SUM(_1#179): long (nullable = true) |-- SUM(_2#180): long (nullable = true) Thanks. Justin -- View this message in context: Column renaming after DataFrame.groupBy http://apache-spark-user-list.1001560.n3.nabble.com/Column-renaming-after-DataFrame-groupBy-tp22586.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Spark 1.3.1 Dataframe breaking ALS.train?
Hi Ayan, If you want to use DataFrame, then you should use the Pipelines API (org.apache.spark.ml.*) which will take DataFrames: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS In the examples/ directory for ml/, you can find a MovieLensALS example. Good luck! Joseph On Tue, Apr 21, 2015 at 4:58 AM, ayan guha guha.a...@gmail.com wrote: Hi I am getting an error Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD It was working fine in 1.2.0 (till last night :)) Any solution? I am thinking to map the training dataframe back to a RDD, byt will lose the schema information. Best Ayan On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com wrote: Hi Just upgraded to Spark 1.3.1. I am getting an warning Warning (from warnings module): File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py, line 191 warnings.warn(inferSchema is deprecated, please use createDataFrame instead) UserWarning: inferSchema is deprecated, please use createDataFrame instead However, documentation still says to use inferSchema. Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in section Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Rank:8 Lmbda:1.0 iteration:10 Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha
Re: Instantiating/starting Spark jobs programmatically
On 21 Apr 2015, at 17:34, Richard Marscher rmarsc...@localytics.commailto:rmarsc...@localytics.com wrote: - There are System.exit calls built into Spark as of now that could kill your running JVM. We have shadowed some of the most offensive bits within our own application to work around this. You'd likely want to do that or to do your own Spark fork. For example, if the SparkContext can't connect to your cluster master node when it is created, it will System.exit. people can block errant System.exit calls by running under a SecurityManager. Less than ideal (and there's a small performance hit) -but possible
Re: Spark Unit Testing
Hi Emre, thanks for the help will have a look. Cheers! On Tue, Apr 21, 2015 at 1:46 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello James, Did you check the following resources: - https://github.com/apache/spark/tree/master/streaming/src/test/java/org/apache/spark/streaming - http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs -- Emre Sevinç http://www.bigindustries.be/ On Tue, Apr 21, 2015 at 1:26 PM, James King jakwebin...@gmail.com wrote: I'm trying to write some unit tests for my spark code. I need to pass a JavaPairDStreamString, String to my spark class. Is there a way to create a JavaPairDStream using Java API? Also is there a good resource that covers an approach (or approaches) for unit testing using Java. Regards jk -- Emre Sevinc
Column renaming after DataFrame.groupBy
Hello, I would like rename a column after aggregation. In the following code, the column name is SUM(_1#179), is there a way to rename it to a more friendly name? scala val d = sqlContext.createDataFrame(Seq((1, 2), (1, 3), (2, 10))) scala d.groupBy(_1).sum().printSchema root |-- _1: integer (nullable = false) |-- SUM(_1#179): long (nullable = true) |-- SUM(_2#180): long (nullable = true) Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Column-renaming-after-DataFrame-groupBy-tp22586.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Compression and Hive with Spark 1.3
Some more info: I'm putting the compressions values on hive-site.xml and running spark job. hc.sql(set ) returns the expected (compression) configuration but looking at the logs, it create the tables without compression: 15/04/21 13:14:19 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:core_secm_instr_21042015_131411_tmp, dbName:default, owner:hadoop, createTime:1429622059, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring, comment:from deserializer)], location:null, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe, parameters:{serialization.format=1, path=hdfs:// 10.166.157.97:9000/user/hive/warehouse/core_secm_instr_21042015_131411_tmp}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{})), partitionKeys:[], parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:sid,type:integer,nullable:true,metadata:{}},{name:typeid,type:integer,nullable:true,metadata:{}},{name:symbol,type:string,nullable:true,metadata:{}},{name:name,type:string,nullable:true,metadata:{}},{name:beginDT,type:long,nullable:true,metadata:{}},{name:endDT,type:long,nullable:true,metadata:{}}]}, EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=org.apache.spark.sql.parquet}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) 15/04/21 13:14:19 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=create_table: Table(tableName:core_secm_instr_21042015_131411_tmp, dbName:default, owner:hadoop, createTime:1429622059, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring, comment:from deserializer)], location:null, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe, parameters:{serialization.format=1, path=hdfs:// 10.166.157.97:9000/user/hive/warehouse/core_secm_instr_21042015_131411_tmp}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{})), partitionKeys:[], parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:sid,type:integer,nullable:true,metadata:{}},{name:typeid,type:integer,nullable:true,metadata:{}},{name:symbol,type:string,nullable:true,metadata:{}},{name:name,type:string,nullable:true,metadata:{}},{name:beginDT,type:long,nullable:true,metadata:{}},{name:endDT,type:long,nullable:true,metadata:{}}]}, EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=org.apache.spark.sql.parquet}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) On Tue, Apr 21, 2015 at 12:40 PM, Ophir Cohen oph...@gmail.com wrote: Sadly I'm encounter too many issues migrating my code to Spark 1.3 I wrote one problem on other mail but my main problem is that I can't set the right compression type. In Spark 1.2.1 setting the following values was enough: hc.setConf(hive.exec.compress.output, true) hc.setConf(mapreduce.output.fileoutputformat.compress.codec, org.apache.hadoop.io.compress.SnappyCodec) hc.setConf(mapreduce.output.fileoutputformat.compress.type, BLOCK) Running it the new cluster I: 1. Get the files uncompressed and named *.parquet 2. When trying to explore it using Hive CLI I get the follwing excpetion: *Failed with exception java.io.IOException:java.io.IOException: hdfs://10.166.157.97:9000/user/hive/warehouse/core_equity_corp_splits_divs/part-r-1.parquet http://10.166.157.97:9000/user/hive/warehouse/core_equity_corp_splits_divs/part-r-1.parquet not a SequenceFileTime taken: 0.538 seconds* 3. Running from Spark shell the same query yield empty results. Please advise
Re: Shuffle files not cleaned up (Spark 1.2.1)
We already do have a cron job in place to clean just the shuffle files. However, what I would really like to know is whether there is a proper way of telling spark to clean up these files once its done with them? Thanks NB On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com wrote: Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote: Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Re: Running spark over HDFS
On Tuesday 21 April 2015 12:12 PM, Akhil Das wrote: Your spark master should be spark://swetha:7077 :) Thanks Best Regards On Mon, Apr 20, 2015 at 2:44 PM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: PFA screenshot of my cluster UI Thanks On Monday 20 April 2015 02:27 PM, Akhil Das wrote: Are you seeing your task being submitted to the UI? Under completed or running tasks? How much resources are you allocating for your job? Can you share a screenshot of your cluster UI and the code snippet that you are trying to run? Thanks Best Regards On Mon, Apr 20, 2015 at 12:37 PM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: Hi, I Did the same you told but now it is giving the following error: ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. On UI it is showing that master is working Thanks Madhvi On Monday 20 April 2015 12:28 PM, Akhil Das wrote: In your eclipse, while you create your SparkContext, set the master uri as shown in the web UI's top left corner like: spark://someIPorHost:7077 and it should be fine. Thanks Best Regards On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: Hi All, I am new to spark and have installed spark cluster over my system having hadoop cluster.I want to process data stored in HDFS through spark. When I am running code in eclipse it is giving the following warning repeatedly: scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. I have made changes to spark-env.sh file as below export SPARK_WORKER_INSTANCES=1 export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop export SPARK_WORKER_MEMORY=1g export SPARK_WORKER_CORES=2 export SPARK_EXECUTOR_MEMORY=1g I am running the spark standalone cluster.In cluster UI it is showing all workers with allocated resources but still its not working. what other configurations are needed to be changed? Thanks Madhvi Gupta - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org Thanks Akhil, It worked fine after replacing IP with the hostname and running the code by making jar of it by spark submit Madhvi
Re: Parquet Hive table become very slow on 1.3?
We have the similar issue with massive parquet files, Cheng Lian, could you have a look? 2015-04-08 15:47 GMT+08:00 Zheng, Xudong dong...@gmail.com: Hi Cheng, I tried both these patches, and seems still not resolve my issue. And I found the most time is spend on this line in newParquet.scala: ParquetFileReader.readAllFootersInParallel( sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) Which need read all the files under the Parquet folder, while our Parquet folder has a lot of Parquet files (near 2000), read one file need about 2 seconds, so it become very slow ... And the PR 5231 did not skip this steps so it not resolve my issue. As our Parquet files are generated by a Spark job, so the number of .parquet files is same with the number of tasks, that is why we have so many files. But these files actually have the same schema. Is there any way to merge these files into one, or avoid scan each of them? On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Xudong, We had been digging this issue for a while, and believe PR 5339 http://github.com/apache/spark/pull/5339 and PR 5334 http://github.com/apache/spark/pull/5339 should fix this issue. There two problems: 1. Normally we cache Parquet table metadata for better performance, but when converting Hive metastore Hive tables, the cache is not used. Thus heavy operations like schema discovery is done every time a metastore Parquet table is converted. 2. With Parquet task side metadata reading (which is turned on by default), we can actually skip the row group information in the footer. However, we accidentally called a Parquet function which doesn't skip row group information. For your question about schema merging, Parquet allows different part-files have different but compatible schemas. For example, part-1.parquet has columns a and b, while part-2.parquet may has columns a and c. In some cases, the summary files (_metadata and _common_metadata) contains the merged schema (a, b, and c), but it's not guaranteed. For example, when the user defined metadata stored different part-files contain different values for the same key, Parquet simply gives up writing summary files. That's why all part-files must be touched to get a precise merged schema. However, in scenarios where a centralized arbitrative schema is available (e.g. Hive metastore schema, or the schema provided by user via data source DDL), we don't need to do schema merging on driver side, but defer it to executor side and each task only needs to reconcile those part-files it needs to touch. This is also what the Parquet developers did recently for parquet-hadoop https://github.com/apache/incubator-parquet-mr/pull/45. Cheng On 3/31/15 11:49 PM, Zheng, Xudong wrote: Thanks Cheng! Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, but the PR 5231 seems not. Not sure any other things I did wrong ... BTW, actually, we are very interested in the schema merging feature in Spark 1.3, so both these two solution will disable this feature, right? It seems that Parquet metadata is store in a file named _metadata in the Parquet file folder (each folder is a partition as we use partition table), why we need scan all Parquet part files? Is there any other solutions could keep schema merging feature at the same time? We are really like this feature :) On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com wrote: Hi Xudong, This is probably because of Parquet schema merging is turned on by default. This is generally useful for Parquet files with different but compatible schemas. But it needs to read metadata from all Parquet part-files. This can be problematic when reading Parquet files with lots of part-files, especially when the user doesn't need schema merging. This issue is tracked by SPARK-6575, and here is a PR for it: https://github.com/apache/spark/pull/5231. This PR adds a configuration to disable schema merging by default when doing Hive metastore Parquet table conversion. Another workaround is to fallback to the old Parquet code by setting spark.sql.parquet.useDataSourceApi to false. Cheng On 3/31/15 2:47 PM, Zheng, Xudong wrote: Hi all, We are using Parquet Hive table, and we are upgrading to Spark 1.3. But we find that, just a simple COUNT(*) query will much slower (100x) than Spark 1.2. I find the most time spent on driver to get HDFS blocks. I find large amount of get below logs printed: 15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{ fileLength=77153436 underConstruction=false blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
Features scaling
Hi! I want to normalize features before train logistic regression. I setup scaler: scaler2 = StandardScaler(withMean=True, withStd=True).fit(features) and apply it to a dataset: scaledData = dataset.map(lambda x: LabeledPoint(x.label, scaler2.transform(Vectors.dense(x.features.toArray() but I can't work with scaledData (can't output it or train regression on it), got an error: Exception: It appears that you are attempting to reference SparkContext from a b roadcast variable, action, or transforamtion. SparkContext can only be used on t he driver, not in code that it run on workers. For more information, see SPARK-5 063. Does it correct code to make normalization? Why it doesn't work? Any advices are welcome. Thanks. Full code: https://gist.github.com/dkozyr/d31551a3ebed0ee17772 Console output: https://gist.github.com/dkozyr/199f0d4f44cf522f9453 Denys - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can't get SparkListener to work
You need to call sc.stop() to wait for the notifications to be processed. Best Regards, Shixiong(Ryan) Zhu 2015-04-21 4:18 GMT+08:00 Praveen Balaji secondorderpolynom...@gmail.com: Thanks Shixiong. I tried it out and it works. If you're looking at this post, here a few points you may be interested in: Turns out this is has to do with the difference between methods and function in scala - something I didn't pay attention to before. If you're looking at this thread, this may be an interesting post: http://jim-mcbeath.blogspot.com/2009/05/scala-functions-vs-methods.html Below is some test code. I added the Thread.sleep because it looks like Spark notifications happen asynchronously and the main/driver thread wont wait for the notifications to be complete. I'll look at that further later, but for now that's my inference, so don't take my word for it yet. Here's the code: object TestME { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(testme); val sc = new SparkContext(conf); try { foo(sc); } finally { Thread.sleep(2000); } } def foo(sc: SparkContext) = { sc.addSparkListener(new SparkListener() { override def onTaskStart(e: SparkListenerTaskStart) = println( onTaskStart); override def onTaskEnd(e: SparkListenerTaskEnd) = println( onTaskEnd); }); sc.parallelize(List(1, 2, 3)).map(i = throw new SparkException(test)).collect(); } } I'm running it from Eclipse on local[*]. On Sun, Apr 19, 2015 at 7:57 PM, Praveen Balaji secondorderpolynom...@gmail.com wrote: Thanks Shixiong. I'll try this. On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu zsxw...@gmail.com wrote: The problem is the code you use to test: sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); is like the following example: def foo: Int = Nothing = { throw new SparkException(test) } sc.parallelize(List(1, 2, 3)).map(foo).collect(); So actually the Spark jobs do not be submitted since it fails in `foo` that is used to create the map function. Change it to sc.parallelize(List(1, 2, 3)).map(i = throw new SparkException(test)).collect(); And you will see the correct messages from your listener. Best Regards, Shixiong(Ryan) Zhu 2015-04-19 1:06 GMT+08:00 Praveen Balaji secondorderpolynom...@gmail.com: Thanks for the response, Archit. I get callbacks when I do not throw an exception from map. My use case, however, is to get callbacks for exceptions in transformations on executors. Do you think I'm going down the right route? Cheers -p On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur archit279tha...@gmail.com wrote: Hi Praveen, Can you try once removing throw exception in map. Do you still not get it.? On Apr 18, 2015 8:14 AM, Praveen Balaji secondorderpolynom...@gmail.com wrote: Thanks for the response, Imran. I probably chose the wrong methods for this email. I implemented all methods of SparkListener and the only callback I get is onExecutorMetricsUpdate. Here's the complete code: == import org.apache.spark.scheduler._ sc.addSparkListener(new SparkListener() { override def onStageCompleted(e: SparkListenerStageCompleted) = println( onStageCompleted); override def onStageSubmitted(e: SparkListenerStageSubmitted) = println( onStageSubmitted); override def onTaskStart(e: SparkListenerTaskStart) = println( onTaskStart); override def onTaskGettingResult(e: SparkListenerTaskGettingResult) = println( onTaskGettingResult); override def onTaskEnd(e: SparkListenerTaskEnd) = println( onTaskEnd); override def onJobStart(e: SparkListenerJobStart) = println( onJobStart); override def onJobEnd(e: SparkListenerJobEnd) = println( onJobEnd); override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate); override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) = println( onBlockManagerAdded); override def onBlockManagerRemoved(e: SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved); override def onUnpersistRDD(e: SparkListenerUnpersistRDD) = println( onUnpersistRDD); override def onApplicationStart(e: SparkListenerApplicationStart) = println( onApplicationStart); override def onApplicationEnd(e: SparkListenerApplicationEnd) = println( onApplicationEnd); override def onExecutorMetricsUpdate(e: SparkListenerExecutorMetricsUpdate) = println( onExecutorMetricsUpdate); }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); = On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com wrote: when you start the spark-shell, its already too late to get the ApplicationStart event. Try listening for StageCompleted or JobEnd instead. On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji secondorderpolynom...@gmail.com
Re: Spark and accumulo
Hello Madvi, Some work has been done by @pomadchin using the spark notebook, maybe you should come on https://gitter.im/andypetrella/spark-notebook and poke him? There are some discoveries he made that might be helpful to know. Also you can poke @lossyrob from Azavea, he did that for geotrellis my0.2c andy On Tue, Apr 21, 2015 at 9:25 AM Akhil Das ak...@sigmoidanalytics.com wrote: You can simply use a custom inputformat (AccumuloInputFormat) with the hadoop RDDs (sc.newApiHadoopFile etc) for that, all you need to do is to pass the jobConfs. Here's pretty clean discussion: http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook#answers-header Thanks Best Regards On Tue, Apr 21, 2015 at 9:55 AM, madhvi madhvi.gu...@orkash.com wrote: Hi all, Is there anything to integrate spark with accumulo or make spark to process over accumulo data? Thanks Madhvi Gupta - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Different behavioural of HiveContext vs. Hive?
Lately we upgraded our Spark to 1.3. Not surprisingly, over the way I find few incomputability between the versions and quite expected. I found change that I'm interesting to understand it origin. env: Amazon EMR, Spark 1.3, Hive 0.13, Hadoop 2.4 In Spark 1.2.1 I ran from the code query such: SHOW TABLES LIKE '*tmp*' The query gave me results from the code and from the Hive CLI. After upgrading to Spark 1.3, from Hive CLI it still works but I get the below exception from Spark. Changing to other formats such as: SHOW TABLES *.tmp or SHOW TABLES in default *.tmp Yield with the same exception... Any ideas? *hc.sql(SHOW TABLES LIKE '*tmp*')java.lang.RuntimeException: [1.13] failure: ``in'' expected but identifier LIKE foundSHOW TABLES LIKE '*tmp*'^at scala.sys.package$.error(package.scala:27)at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:40) at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala*
Re: Running spark over HDFS
Your spark master should be spark://swetha:7077 :) Thanks Best Regards On Mon, Apr 20, 2015 at 2:44 PM, madhvi madhvi.gu...@orkash.com wrote: PFA screenshot of my cluster UI Thanks On Monday 20 April 2015 02:27 PM, Akhil Das wrote: Are you seeing your task being submitted to the UI? Under completed or running tasks? How much resources are you allocating for your job? Can you share a screenshot of your cluster UI and the code snippet that you are trying to run? Thanks Best Regards On Mon, Apr 20, 2015 at 12:37 PM, madhvi madhvi.gu...@orkash.com wrote: Hi, I Did the same you told but now it is giving the following error: ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. On UI it is showing that master is working Thanks Madhvi On Monday 20 April 2015 12:28 PM, Akhil Das wrote: In your eclipse, while you create your SparkContext, set the master uri as shown in the web UI's top left corner like: spark://someIPorHost:7077 and it should be fine. Thanks Best Regards On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com wrote: Hi All, I am new to spark and have installed spark cluster over my system having hadoop cluster.I want to process data stored in HDFS through spark. When I am running code in eclipse it is giving the following warning repeatedly: scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. I have made changes to spark-env.sh file as below export SPARK_WORKER_INSTANCES=1 export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop export SPARK_WORKER_MEMORY=1g export SPARK_WORKER_CORES=2 export SPARK_EXECUTOR_MEMORY=1g I am running the spark standalone cluster.In cluster UI it is showing all workers with allocated resources but still its not working. what other configurations are needed to be changed? Thanks Madhvi Gupta - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Different behavioural of HiveContext vs. Hive?
BTW This: hc.sql(show tables).collect Works great! On Tue, Apr 21, 2015 at 10:49 AM, Ophir Cohen oph...@gmail.com wrote: Lately we upgraded our Spark to 1.3. Not surprisingly, over the way I find few incomputability between the versions and quite expected. I found change that I'm interesting to understand it origin. env: Amazon EMR, Spark 1.3, Hive 0.13, Hadoop 2.4 In Spark 1.2.1 I ran from the code query such: SHOW TABLES LIKE '*tmp*' The query gave me results from the code and from the Hive CLI. After upgrading to Spark 1.3, from Hive CLI it still works but I get the below exception from Spark. Changing to other formats such as: SHOW TABLES *.tmp or SHOW TABLES in default *.tmp Yield with the same exception... Any ideas? *hc.sql(SHOW TABLES LIKE '*tmp*')java.lang.RuntimeException: [1.13] failure: ``in'' expected but identifier LIKE foundSHOW TABLES LIKE '*tmp*'^at scala.sys.package$.error(package.scala:27)at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:40) at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala*
Re: MLlib - Collaborative Filtering - trainImplicit task size
I think maybe you need more partitions in your input, which might make for smaller tasks? On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone christian.per...@gmail.com wrote: I keep seeing these warnings when using trainImplicit: WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB). The maximum recommended task size is 100 KB. And then the task size starts to increase. Is this a known issue ? Thanks ! -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Cassandra Connection Issue with Spark-jobserver
*I am new to Spark world and Job Server My Code :* package spark.jobserver import java.nio.ByteBuffer import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer import scala.collection.immutable.Map import org.apache.cassandra.hadoop.ConfigHelper import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat import org.apache.cassandra.hadoop.cql3.CqlConfigHelper import org.apache.cassandra.hadoop.cql3.CqlOutputFormat import org.apache.cassandra.utils.ByteBufferUtil import org.apache.hadoop.mapreduce.Job import com.typesafe.config.{Config, ConfigFactory} import org.apache.spark._ import org.apache.spark.SparkContext._ import scala.util.Try object CassandraCQLTest extends SparkJob{ def main(args: Array[String]) { val sc = new SparkContext(local[4], CassandraCQLTest) sc.addJar(/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar); val config = ConfigFactory.parseString() val results = runJob(sc, config) println(Result is + test) } override def validate(sc: SparkContext, config: Config): SparkJobValidation = { Try(config.getString(input.string)) .map(x = SparkJobValid) .getOrElse(SparkJobInvalid(No input.string config param)) } override def runJob(sc: SparkContext, config: Config): Any = { val cHost: String = localhost val cPort: String = 9160 val KeySpace = retail val InputColumnFamily = ordercf val OutputColumnFamily = salecount val job = new Job() job.setInputFormatClass(classOf[CqlPagingInputFormat]) ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily) ConfigHelper.setInputPartitioner(job.getConfiguration(), Murmur3Partitioner) CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), 3) /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), user_id='bob') */ /** An UPDATE writes one or more columns to a record in a Cassandra column family */ val query = UPDATE + KeySpace + . + OutputColumnFamily + SET sale_count = ? CqlConfigHelper.setOutputCql(job.getConfiguration(), query) job.setOutputFormatClass(classOf[CqlOutputFormat]) ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily) ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost) ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) ConfigHelper.setOutputPartitioner(job.getConfiguration(), Murmur3Partitioner) val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[CqlPagingInputFormat], classOf[java.util.Map[String,ByteBuffer]], classOf[java.util.Map[String,ByteBuffer]]) val productSaleRDD = casRdd.map { case (key, value) = { (ByteBufferUtil.string(value.get(prod_id)), ByteBufferUtil.toInt(value.get(quantity))) } } val aggregatedRDD = productSaleRDD.reduceByKey(_ + _) aggregatedRDD.collect().foreach { case (productId, saleCount) = println(productId + : + saleCount) } val casoutputCF = aggregatedRDD.map { case (productId, saleCount) = { val outColFamKey = Map(prod_id - ByteBufferUtil.bytes(productId)) val outKey: java.util.Map[String, ByteBuffer] = outColFamKey var outColFamVal = new ListBuffer[ByteBuffer] outColFamVal += ByteBufferUtil.bytes(saleCount) val outVal: java.util.List[ByteBuffer] = outColFamVal (outKey, outVal) } } casoutputCF.saveAsNewAPIHadoopFile( KeySpace, classOf[java.util.Map[String, ByteBuffer]], classOf[java.util.List[ByteBuffer]], classOf[CqlOutputFormat], job.getConfiguration() ) casRdd.count } } *When I push the Jar using spark-jobServer and execute it I get this on spark-jobserver terminal * job-server[ERROR] Exception in thread pool-1-thread-1 java.lang.NoClassDefFoundError: org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46) job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21) job-server[ERROR] at spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235) job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) job-server[ERROR] at
Re: Custom Partitioning Spark
Hi Archit, Thanks a lot for your reply. I am using rdd.partitions.length to check the number of partitions. rdd.partitions return the array of partitions. I would like to add one more question here do you have any idea how to get the objects in each partition ? Further is there any way to figure out which particular partitions an object bleongs ? Thanks, On Tue, Apr 21, 2015 at 12:16 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi, This should work. How are you checking the no. of partitions.? Thanks and Regards, Archit Thakur. On Mon, Apr 20, 2015 at 7:26 PM, mas mas.ha...@gmail.com wrote: Hi, I aim to do custom partitioning on a text file. I first convert it into pairRDD and then try to use my custom partitioner. However, somehow it is not working. My code snippet is given below. val file=sc.textFile(filePath) val locLines=file.map(line = line.split(\t)).map(line= ((line(2).toDouble,line(3).toDouble),line(5).toLong)) val ck=locLines.partitionBy(new HashPartitioner(50)) // new CustomPartitioner(50) -- none of the way is working here. while reading the file using textFile method it automatically partitions the file. However when i explicitly want to partition the new rdd locLines, It doesn't appear to do anything and even the number of partitions are same which is created by sc.textFile(). Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.*
Re: implicits is not a member of org.apache.spark.sql.SQLContext
Have you tried the following ? import sqlContext._ import sqlContext.implicits._ Cheers On Tue, Apr 21, 2015 at 7:54 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I tried to convert an RDD to a data frame using the example codes on spark website case class Person(name: String, age: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = *Person*(p(0), p(1).trim.toInt)).toDF() It compile fine using sbt. But in IntelliJ IDEA 14.0.3, it fail to compile and generate the following error Error:(289, 23) value *implicits is not a member of org.apache.spark.sql.SQLContext* import sqlContext.implicits._ ^ What is the problem here? Is there any work around (e.g. do explicit conversion)? Here is my build.sbt name := my-project version := 0.2 scalaVersion := 2.10.4 val sparkVersion = 1.3.1 val luceneVersion = 4.10.2 libraryDependencies = scalaVersion { scala_version = Seq( org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-mllib % sparkVersion % provided, spark.jobserver % job-server-api % 0.4.1 % provided, org.scalatest %% scalatest % 2.2.1 % test ) } resolvers += Spark Packages Repo at http://dl.bintray.com/spark-packages/maven; resolvers += Resolver.mavenLocal Ningjun
implicits is not a member of org.apache.spark.sql.SQLContext
I tried to convert an RDD to a data frame using the example codes on spark website case class Person(name: String, age: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() It compile fine using sbt. But in IntelliJ IDEA 14.0.3, it fail to compile and generate the following error Error:(289, 23) value implicits is not a member of org.apache.spark.sql.SQLContext import sqlContext.implicits._ ^ What is the problem here? Is there any work around (e.g. do explicit conversion)? Here is my build.sbt name := my-project version := 0.2 scalaVersion := 2.10.4 val sparkVersion = 1.3.1 val luceneVersion = 4.10.2 libraryDependencies = scalaVersion { scala_version = Seq( org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-mllib % sparkVersion % provided, spark.jobserver % job-server-api % 0.4.1 % provided, org.scalatest %% scalatest % 2.2.1 % test ) } resolvers += Spark Packages Repo at http://dl.bintray.com/spark-packages/maven; resolvers += Resolver.mavenLocal Ningjun
Re: Join on DataFrames from the same source (Pyspark)
Sorry, my code actually was df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') But in Spark 1.4.0 this does not seem to make any difference anyway and the problem is the same with both versions. On 2015-04-21 17:04, ayan guha wrote: your code should be df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') Your current code is generating a tupple, and of course df_1 and df_2 are different, so join is yielding to cartesian. Best Ayan On Wed, Apr 22, 2015 at 12:42 AM, Karlson ksonsp...@siberie.de wrote: Hi, can anyone confirm (and if so elaborate on) the following problem? When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example: I load a DataFrame with n rows from disk: df = sql_context.parquetFile('data.parquet') Then I create two DataFrames from that source. df_one = df.select(['col1', 'col2']) df_two = df.select(['col1', 'col3']) Finally I want to (inner) join them back together: df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner') The key in col1 is unique. The resulting DataFrame should have n rows, however it does have n*n rows. That does not happen, when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Clustering algorithms in Spark
The problem with k means is we have to define the no of cluster which I dont want in this case So thinking for something like hierarchical clustering any idea and suggestions? On 21 April 2015 at 20:51, Jeetendra Gangele gangele...@gmail.com wrote: I have a requirement in which I want to match the company name .. and I am thinking to solve this using clustering technique. Can anybody suggest which algo I should Use in Spark and how to evaluate the running time and accuracy for this particular problem. I checked K means looks good. Any idea suggestions?
Re: Join on DataFrames from the same source (Pyspark)
your code should be df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') Your current code is generating a tupple, and of course df_1 and df_2 are different, so join is yielding to cartesian. Best Ayan On Wed, Apr 22, 2015 at 12:42 AM, Karlson ksonsp...@siberie.de wrote: Hi, can anyone confirm (and if so elaborate on) the following problem? When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example: I load a DataFrame with n rows from disk: df = sql_context.parquetFile('data.parquet') Then I create two DataFrames from that source. df_one = df.select(['col1', 'col2']) df_two = df.select(['col1', 'col3']) Finally I want to (inner) join them back together: df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner') The key in col1 is unique. The resulting DataFrame should have n rows, however it does have n*n rows. That does not happen, when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
Re: Custom Partitioning Spark
Are you looking for? *mapPartitions*(*func*)Similar to map, but runs separately on each partition (block) of the RDD, so *func* must be of type IteratorT = IteratorU when running on an RDD of type T.*mapPartitionsWithIndex*(*func* )Similar to mapPartitions, but also provides *func* with an integer value representing the index of the partition, so *func* must be of type (Int, IteratorT) = IteratorU when running on an RDD of type T. On Wed, Apr 22, 2015 at 1:00 AM, MUHAMMAD AAMIR mas.ha...@gmail.com wrote: Hi Archit, Thanks a lot for your reply. I am using rdd.partitions.length to check the number of partitions. rdd.partitions return the array of partitions. I would like to add one more question here do you have any idea how to get the objects in each partition ? Further is there any way to figure out which particular partitions an object bleongs ? Thanks, On Tue, Apr 21, 2015 at 12:16 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi, This should work. How are you checking the no. of partitions.? Thanks and Regards, Archit Thakur. On Mon, Apr 20, 2015 at 7:26 PM, mas mas.ha...@gmail.com wrote: Hi, I aim to do custom partitioning on a text file. I first convert it into pairRDD and then try to use my custom partitioner. However, somehow it is not working. My code snippet is given below. val file=sc.textFile(filePath) val locLines=file.map(line = line.split(\t)).map(line= ((line(2).toDouble,line(3).toDouble),line(5).toLong)) val ck=locLines.partitionBy(new HashPartitioner(50)) // new CustomPartitioner(50) -- none of the way is working here. while reading the file using textFile method it automatically partitions the file. However when i explicitly want to partition the new rdd locLines, It doesn't appear to do anything and even the number of partitions are same which is created by sc.textFile(). Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.* -- Best Regards, Ayan Guha
Re: Meet Exception when learning Broadcast Variables
Does line 27 correspond to brdcst.value ? Cheers On Apr 21, 2015, at 3:19 AM, donhoff_h 165612...@qq.com wrote: Hi, experts. I wrote a very little program to learn how to use Broadcast Variables, but met an exception. The program and the exception are listed as following. Could anyone help me to solve this problem? Thanks! **My Program is as following** object TestBroadcast02 { var brdcst : Broadcast[Array[Int]] = null def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) brdcst = sc.broadcast(Array(1,2,3,4,5,6)) val rdd = sc.textFile(hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt) rdd.foreachPartition(fun1) sc.stop() } def fun1(it : Iterator[String]) : Unit = { val v = brdcst.value for(i - v) println(BroadCast Variable:+i) for(j - it) println(Text File Content:+j) } } **The Exception is as following** 15/04/21 17:39:53 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, bgdt01.dev.hrb): java.lang.NullPointerException at dhao.test.BroadCast.TestBroadcast02$.fun1(TestBroadcast02.scala:27) at dhao.test.BroadCast.TestBroadcast02$$anonfun$main$1.apply(TestBroadcast02.scala:22) at dhao.test.BroadCast.TestBroadcast02$$anonfun$main$1.apply(TestBroadcast02.scala:22) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) By the way, if I use anonymous function instead of 'fun1' in my program, it works. But since I think the readability is not good for anonymous functions, I still prefer to use the 'fun1' . - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problem with using Spark ML
Hi, I've written an application that performs some machine learning on some data. I've validated that the data _should_ give a good output with a decent RMSE by using Lib-SVM: Mean squared error = 0.00922063 (regression) Squared correlation coefficient = 0.9987 (regression) When I try to use Spark ML to do the exact same thing I get: Mean Squared Error = 8.466193152067944E224 Which is somewhat worse.. I've tried to look at the data before it's inputted to the model, printed that data to file (which is actually the data used when I got the result from Lib-SVM above). Somewhere there much be a huge mistake, but I cannot place it somewhere in my code (see below). traningLP and testLP are training and test-data, in RDD[LabeledPoint]. // Generate model val model_gen = new RidgeRegressionWithSGD(); val model = model_gen.run(trainingLP); // Predict on the test-data val valuesAndPreds = testLP.map { point = val prediction = model.predict(point.features); println(label: + point.label + , pred: + prediction); (point.label, prediction); } val MSE = valuesAndPreds.map{case(v, p) = math.pow((v - p), 2)}.mean(); println(Mean Squared Error = + MSE) I've printed label and prediction-values for each data-point in the testset, and the result is something like this; label: 5.04, pred: -4.607899000641277E112 label: 3.59, pred: -3.96787105480399E112 label: 5.06, pred: -2.8263294374576145E112 label: 2.85, pred: -1.1536508029072844E112 label: 2.1, pred: -4.269312783707508E111 label: 2.75, pred: -3.0072665148591558E112 label: -0.29, pred: -2.035681731641989E112 label: 1.98, pred: -3.163404340354783E112 So there is obviously something wrong with the prediction step. I'm using the SparseVector representation of the Vector in LabeledPoint, looking something like this for reference (shortened for convenience); (-1.59,(2080,[29,59,62,74,127,128,131,144,149,175,198,200,239,247,267,293,307,364,374,393,410,424,425,431,448,469,477,485,501,525,532,533,538,560,..],[1.0,1.0,2.0,8.0,1.0,1.0,6.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,5.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,8.0,2.0,1.0,1.0,..])) (-1.75,(2080,[103,131,149,208,296,335,520,534,603,620,661,694,709,748,859,1053,1116,1156,1186,1207,1208,1223,1256,1278,1356,1375,1399,1480,1569,..],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,2.0,2.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,4.0,1.0,7.0,1.0,3.0,2.0,1.0])) I do get one type of warning, but that's about it! (And as to my understanding, this native code is not required to get the correct results, only to improve performance). 6010 [main] WARN com.github.fommil.netlib.BLAS - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 6011 [main] WARN com.github.fommil.netlib.BLAS - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS So where do I go from here? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-using-Spark-ML-tp22591.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming updatyeStateByKey throws OutOfMemory Error
Hi, We are building a spark streaming application which reads from kafka, does updateStateBykey based on the received message type and finally stores into redis. After running for few seconds the executor process get killed by throwing OutOfMemory error. The code snippet is below: *NoOfReceiverInstances = 1* *val kafkaStreams = (1 to NoOfReceiverInstances).map(* * _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)* *)* *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {...}* *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* *object RedisHelper {* * private val client = scredis.Redis(* * ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* * )* * def update(**itr: Iterator[(String, (Long, Long))]) {* *// redis save operation* * }* *}* *Below is the spark configuration:* *spark.app.name http://spark.app.name = XXX* *spark.jars = .jar* *spark.home = /spark-1.1.1-bin-hadoop2.4* *spark.executor.memory = 1g* *spark.streaming.concurrentJobs = 1000* *spark.logConf = true* *spark.cleaner.ttl = 3600 //in milliseconds* *spark.default.parallelism = 12* *spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof -XX:+HeapDumpOnOutOfMemoryError* *spark.executor.logs.rolling.strategy = size* *spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* *spark.executor.logs.rolling.maxRetainedFiles = 10* *spark.serializer = org.apache.spark.serializer.KryoSerializer* *spark.kryo.registrator = xxx.NoOpKryoRegistrator* other configurations are below *streaming {* *// All streaming context related configs should come here* *batch-duration = 1 second* *checkpoint-directory = /tmp* *checkpoint-duration = 10 seconds* *slide-duration = 1 second* *window-duration = 1 second* *partitions-for-shuffle-task = 32* * }* * kafka {* *no-of-receivers = 1* *zookeeper-quorum = :2181* *consumer-group = x* *topic = x:2* * }* We tried different combinations like - with spark 1.1.0 and 1.1.1. - by increasing executor memory - by changing the serialization strategy (switching between kryo and normal java) - by changing broadcast strategy (switching between http and torrent broadcast) Can anyone give any insight what we are missing here? How can we fix this? Due to akka version mismatch with some other libraries we cannot upgrade the spark version. Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Column renaming after DataFrame.groupBy
Hi There are 2 ways of doing it. 1. Using SQL - this method directly creates another dataframe object. 2. Using methods of the DF object, but in that case you have to provide the schema through a row object. In this case you need to explicitly call createDataFrame again which will infer the schema for you. Here is python code Method 1: userStat = ssc.sql(select userId,sum(rating) total from ratings group by userId) print userStat.collect()[10] userStat.printSchema() Method 2: userStatDF = userStat.groupBy(userId).sum().map(lambda t: Row(userId=t[0],total=t[1])) userStatDFSchema = ssc.createDataFrame(userStatDF) print type(userStatDFSchema) print userStatDFSchema.printSchema() Output: Row(userId=233, total=478) root |-- userId: long (nullable = true) |-- total: long (nullable = true) class 'pyspark.sql.dataframe.DataFrame' root |-- total: long (nullable = true) |-- userId: long (nullable = true) As you can see, the downside of Method 2 is order of the fields are now inferred (and most likely created in a dict under the hood) so ordered alphabetically. Hope this helps Best Ayan On Tue, Apr 21, 2015 at 6:06 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I would like rename a column after aggregation. In the following code, the column name is SUM(_1#179), is there a way to rename it to a more friendly name? scala val d = sqlContext.createDataFrame(Seq((1, 2), (1, 3), (2, 10))) scala d.groupBy(_1).sum().printSchema root |-- _1: integer (nullable = false) |-- SUM(_1#179): long (nullable = true) |-- SUM(_2#180): long (nullable = true) Thanks. Justin -- View this message in context: Column renaming after DataFrame.groupBy http://apache-spark-user-list.1001560.n3.nabble.com/Column-renaming-after-DataFrame-groupBy-tp22586.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com. -- Best Regards, Ayan Guha
Re: Custom Partitioning Spark
Hi, This should work. How are you checking the no. of partitions.? Thanks and Regards, Archit Thakur. On Mon, Apr 20, 2015 at 7:26 PM, mas mas.ha...@gmail.com wrote: Hi, I aim to do custom partitioning on a text file. I first convert it into pairRDD and then try to use my custom partitioner. However, somehow it is not working. My code snippet is given below. val file=sc.textFile(filePath) val locLines=file.map(line = line.split(\t)).map(line= ((line(2).toDouble,line(3).toDouble),line(5).toLong)) val ck=locLines.partitionBy(new HashPartitioner(50)) // new CustomPartitioner(50) -- none of the way is working here. while reading the file using textFile method it automatically partitions the file. However when i explicitly want to partition the new rdd locLines, It doesn't appear to do anything and even the number of partitions are same which is created by sc.textFile(). Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.3.1 Dataframe breaking ALS.train?
Hi I am getting an error Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD It was working fine in 1.2.0 (till last night :)) Any solution? I am thinking to map the training dataframe back to a RDD, byt will lose the schema information. Best Ayan On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com wrote: Hi Just upgraded to Spark 1.3.1. I am getting an warning Warning (from warnings module): File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py, line 191 warnings.warn(inferSchema is deprecated, please use createDataFrame instead) UserWarning: inferSchema is deprecated, please use createDataFrame instead However, documentation still says to use inferSchema. Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in section Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Rank:8 Lmbda:1.0 iteration:10 Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha
RE:RE:maven compile error
I have similar issue (I failed on the spark core project but with same exception as you). Then I follow the below steps (I am working on windows): Delete the maven repository, and re-download all dependency. The issue sounds like a corrupted jar can’t be opened by maven. Other than this, I also did below steps (I don’t think it is the solution, but I just describe my steps): 1, Uninstall scala 2.11 version (I have one install there). Then I only have 2.10.5 on my pc 2, Upgrade maven to latest 3.3.1 3, Install latest git client Regards, Shuai From: myelinji [mailto:myeli...@aliyun.com] Sent: Friday, April 03, 2015 6:58 AM To: Sean Owen Cc: spark用户组 Subject: 答复:maven compile error Thank you for your reply. When I'm using maven to compile the whole project, the erros as follows [INFO] Spark Project Parent POM .. SUCCESS [4.136s] [INFO] Spark Project Networking .. SUCCESS [7.405s] [INFO] Spark Project Shuffle Streaming Service ... SUCCESS [5.071s] [INFO] Spark Project Core SUCCESS [3:08.445s] [INFO] Spark Project Bagel ... SUCCESS [21.613s] [INFO] Spark Project GraphX .. SUCCESS [58.915s] [INFO] Spark Project Streaming ... SUCCESS [1:26.202s] [INFO] Spark Project Catalyst FAILURE [1.537s] [INFO] Spark Project SQL . SKIPPED [INFO] Spark Project ML Library .. SKIPPED [INFO] Spark Project Tools ... SKIPPED [INFO] Spark Project Hive SKIPPED [INFO] Spark Project REPL SKIPPED [INFO] Spark Project Assembly SKIPPED [INFO] Spark Project External Twitter SKIPPED [INFO] Spark Project External Flume Sink . SKIPPED [INFO] Spark Project External Flume .. SKIPPED [INFO] Spark Project External MQTT ... SKIPPED [INFO] Spark Project External ZeroMQ . SKIPPED [INFO] Spark Project External Kafka .. SKIPPED [INFO] Spark Project Examples SKIPPED it seems like there is something wrong with calatlyst project. Why i cannot compile this project? -- 发件人:Sean Owen so...@cloudera.com 发送时间:2015年4月3日(星期五) 17:48 收件人:myelinji myeli...@aliyun.com 抄 送:spark用户组 user@spark.apache.org 主 题:Re: maven compile error If you're asking about a compile error, you should include the command you used to compile. I am able to compile branch 1.2 successfully with mvn -DskipTests clean package. This error is actually an error from scalac, not a compile error from the code. It sort of sounds like it has not been able to download scala dependencies. Check or maybe recreate your environment. On Fri, Apr 3, 2015 at 3:19 AM, myelinji myeli...@aliyun.com wrote: Hi,all: Just now i checked out spark-1.2 on github , wanna to build it use maven, how ever I encountered an error during compiling: [INFO] [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on project spark-catalyst_2.10: wrap: scala.reflect.internal.MissingRequirementError: object scala.runtime in compiler mirror not found. - [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on project spark-catalyst_2.10: wrap: scala.reflect.internal.MissingRequirementError: object scala.runtime in compiler mirror not found. at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59) at org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196) at org.apache.maven.cli.MavenCli.main(MavenCli.java:141) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at
Re: Understanding the build params for spark with sbt.
Hi Shiyao, From the same page you referred:Maven is the official recommendation for packaging Spark, and is the “build of reference”. But SBT is supported for day-to-day development since it can provide much faster iterative compilation. More advanced developers may wish to use SBT. For maven, pom.xml is the main and important file. -P stands for Profilesearch for 'profile' in spark/pom.xmlMore on it: http://maven.apache.org/guides/introduction/introduction-to-profiles.html -D stands for Definemaven takes it from Java or earlier languages.It is a way to pass system.properties and/or override existing properties from build file. Core build:spark/core/pom.xml is your build file for building only Spark-Core. Thanking you. With Regards Sree On Tuesday, April 21, 2015 12:12 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With maven you could like: mvn -Dhadoop.version=2.3.0 -DskipTests clean package -pl core ThanksBest Regards On Mon, Apr 20, 2015 at 8:10 PM, Shiyao Ma i...@introo.me wrote: Hi. My usage is only about the spark core and hdfs, so no spark sql or mlib or other components invovled. I saw the hint on the http://spark.apache.org/docs/latest/building-spark.html, with a sample like: build/sbt -Pyarn -Phadoop-2.3 assembly. (what's the -P for?) Fundamentally, I'd like to let sbt only compile and package the core and the hadoop. Meanwhile, it would be appreciated if you could inform me what's the scala file that controls the logic of -Pyarn, so that I can dig into the build source and have a finer control. Thanks. -- 吾輩は猫である。ホームーページはhttp://introo.me。 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HiveContext setConf seems not stable
As a workaround, can you call getConf first before any setConf? On Tue, Apr 21, 2015 at 1:58 AM, Ophir Cohen oph...@gmail.com wrote: I think I encounter the same problem, I'm trying to turn on the compression of Hive. I have the following lines: def initHiveContext(sc: SparkContext): HiveContext = { val hc: HiveContext = new HiveContext(sc) hc.setConf(hive.exec.compress.output, true) hc.setConf(mapreduce.output.fileoutputformat.compress.codec, org.apache.hadoop.io.compress.SnappyCodec) hc.setConf(mapreduce.output.fileoutputformat.compress.type, BLOCK) logger.info(hc.getConf(hive.exec.compress.output)) logger.info (hc.getConf(mapreduce.output.fileoutputformat.compress.codec)) logger.info (hc.getConf(mapreduce.output.fileoutputformat.compress.type)) hc } And the log for calling it twice: 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: false 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: org.apache.hadoop.io.compress.SnappyCodec 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: true 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: org.apache.hadoop.io.compress.SnappyCodec 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK BTW It worked on 1.2.1... On Thu, Apr 2, 2015 at 11:47 AM, Hao Ren inv...@gmail.com wrote: Hi, Jira created: https://issues.apache.org/jira/browse/SPARK-6675 Thank you. On Wed, Apr 1, 2015 at 7:50 PM, Michael Armbrust mich...@databricks.com wrote: Can you open a JIRA please? On Wed, Apr 1, 2015 at 9:38 AM, Hao Ren inv...@gmail.com wrote: Hi, I find HiveContext.setConf does not work correctly. Here are some code snippets showing the problem: snippet 1: import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Main extends App { val conf = new SparkConf() .setAppName(context-test) .setMaster(local[8]) val sc = new SparkContext(conf) val hc = new HiveContext(sc) *hc.setConf(spark.sql.shuffle.partitions, 10)* * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println } *Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) (spark.sql.shuffle.partitions,10) snippet 2: ... *hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* * hc.setConf(spark.sql.shuffle.partitions, 10)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println ... *Results:* (hive.metastore.warehouse.dir,/user/hive/warehouse) (spark.sql.shuffle.partitions,10) *You can see that I just permuted the two setConf call, then that leads to two different Hive configuration.* *It seems that HiveContext can not set a new value on hive.metastore.warehouse.dir key in one or the first setConf call.* *You need another setConf call before changing hive.metastore.warehouse.dir. For example, set hive.metastore.warehouse.dir twice and the snippet 1* snippet 3: ... * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println ... *Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) *You can reproduce this if you move to the latest branch-1.3 (1.3.1-snapshot, htag = 7d029cb1eb6f1df1bce1a3f5784fb7ce2f981a33)* *I have also tested the released 1.3.0 (htag = 4aaf48d46d13129f0f9bdafd771dd80fe568a7dc). It has the same problem.* *Please tell me if I am missing something. Any help is highly appreciated.* Hao -- Hao Ren {Data, Software} Engineer @ ClaraVista Paris, France -- Hao Ren {Data, Software} Engineer @ ClaraVista Paris, France
Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized
At this point I am assuming that nobody has an idea... I am still going to give it a last shot just in case it was missed by some people :) Thanks, On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, so I start the context at the very end when all the piping is done. BTW a foreachRDD will be called on the resulting dstream.map() right after that. The puzzling thing is why removing the context bounds solve the problem... What does this exception mean in general? On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote: When are you getting this exception? After starting the context? TD On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable( ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream. scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala: 985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Re: SparkSQL performance
Thanks Michael! I have tried applying my schema programatically but I didn't get any improvement on performance :( Could you point me to some code examples using Avro please? Many thanks again! Renato M. 2015-04-21 20:45 GMT+02:00 Michael Armbrust mich...@databricks.com: Here is an example using rows directly: https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#programmatically-specifying-the-schema Avro or parquet input would likely give you the best performance. On Tue, Apr 21, 2015 at 4:28 AM, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: Thanks for the hints guys! much appreciated! Even if I just do a something like: Select * from tableX where attribute1 5 I see similar behaviour. @Michael Could you point me to any sample code that uses Spark's Rows? We are at a phase where we can actually change our JavaBeans for something that provides a better performance than what we are seeing now. Would you recommend using Avro presentation then? Thanks again! Renato M. 2015-04-21 1:18 GMT+02:00 Michael Armbrust mich...@databricks.com: There is a cost to converting from JavaBeans to Rows and this code path has not been optimized. That is likely what you are seeing. On Mon, Apr 20, 2015 at 3:55 PM, ayan guha guha.a...@gmail.com wrote: SparkSQL optimizes better by column pruning and predicate pushdown, primarily. Here you are not taking advantage of either. I am curious to know what goes in your filter function, as you are not using a filter in SQL side. Best Ayan On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: Does anybody have an idea? a clue? a hint? Thanks! Renato M. 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com: Hi all, I have a simple query Select * from tableX where attribute1 between 0 and 5 that I run over a Kryo file with four partitions that ends up being around 3.5 million rows in our case. If I run this query by doing a simple map().filter() it takes around ~9.6 seconds but when I apply schema, register the table into a SqlContext, and then run the query, it takes around ~16 seconds. This is using Spark 1.2.1 with Scala 2.10.0 I am wondering why there is such a big gap on performance if it is just a filter. Internally, the relation files are mapped to a JavaBean. This different data presentation (JavaBeans vs SparkSQL internal representation) could lead to such difference? Is there anything I could do to make the performance get closer to the hard-coded option? Thanks in advance for any suggestions or ideas. Renato M.
Re: Features scaling
Hi Denys, I don't see any issue in your python code, so maybe there is a bug in python wrapper. If it's in scala, I think it should work. BTW, LogsticRegressionWithLBFGS does the standardization internally, so you don't need to do it yourself. It worths giving it a try! Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Tue, Apr 21, 2015 at 1:00 AM, Denys Kozyr dko...@gmail.com wrote: Hi! I want to normalize features before train logistic regression. I setup scaler: scaler2 = StandardScaler(withMean=True, withStd=True).fit(features) and apply it to a dataset: scaledData = dataset.map(lambda x: LabeledPoint(x.label, scaler2.transform(Vectors.dense(x.features.toArray() but I can't work with scaledData (can't output it or train regression on it), got an error: Exception: It appears that you are attempting to reference SparkContext from a b roadcast variable, action, or transforamtion. SparkContext can only be used on t he driver, not in code that it run on workers. For more information, see SPARK-5 063. Does it correct code to make normalization? Why it doesn't work? Any advices are welcome. Thanks. Full code: https://gist.github.com/dkozyr/d31551a3ebed0ee17772 Console output: https://gist.github.com/dkozyr/199f0d4f44cf522f9453 Denys - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Performance on Yarn
Try --executor-memory 5g , because you have 8 gb RAM in each machine -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22603.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
Hi Sourav, Can you post your updateFunc as well please ? Regards, Olivier. Le mar. 21 avr. 2015 à 12:48, Sourav Chandra sourav.chan...@livestream.com a écrit : Hi, We are building a spark streaming application which reads from kafka, does updateStateBykey based on the received message type and finally stores into redis. After running for few seconds the executor process get killed by throwing OutOfMemory error. The code snippet is below: *NoOfReceiverInstances = 1* *val kafkaStreams = (1 to NoOfReceiverInstances).map(* * _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)* *)* *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {...}* *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* *object RedisHelper {* * private val client = scredis.Redis(* * ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* * )* * def update(**itr: Iterator[(String, (Long, Long))]) {* *// redis save operation* * }* *}* *Below is the spark configuration:* *spark.app.name http://spark.app.name = XXX* *spark.jars = .jar* *spark.home = /spark-1.1.1-bin-hadoop2.4* *spark.executor.memory = 1g* *spark.streaming.concurrentJobs = 1000* *spark.logConf = true* *spark.cleaner.ttl = 3600 //in milliseconds* *spark.default.parallelism = 12* *spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof -XX:+HeapDumpOnOutOfMemoryError* *spark.executor.logs.rolling.strategy = size* *spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* *spark.executor.logs.rolling.maxRetainedFiles = 10* *spark.serializer = org.apache.spark.serializer.KryoSerializer* *spark.kryo.registrator = xxx.NoOpKryoRegistrator* other configurations are below *streaming {* *// All streaming context related configs should come here* *batch-duration = 1 second* *checkpoint-directory = /tmp* *checkpoint-duration = 10 seconds* *slide-duration = 1 second* *window-duration = 1 second* *partitions-for-shuffle-task = 32* * }* * kafka {* *no-of-receivers = 1* *zookeeper-quorum = :2181* *consumer-group = x* *topic = x:2* * }* We tried different combinations like - with spark 1.1.0 and 1.1.1. - by increasing executor memory - by changing the serialization strategy (switching between kryo and normal java) - by changing broadcast strategy (switching between http and torrent broadcast) Can anyone give any insight what we are missing here? How can we fix this? Due to akka version mismatch with some other libraries we cannot upgrade the spark version. Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Spark 1.3.1 Dataframe breaking ALS.train?
Thank you all. On 22 Apr 2015 04:29, Xiangrui Meng men...@gmail.com wrote: SchemaRDD subclasses RDD in 1.2, but DataFrame is no longer an RDD in 1.3. We should allow DataFrames in ALS.train. I will submit a patch. You can use `ALS.train(training.rdd, ...)` for now as a workaround. -Xiangrui On Tue, Apr 21, 2015 at 10:51 AM, Joseph Bradley jos...@databricks.com wrote: Hi Ayan, If you want to use DataFrame, then you should use the Pipelines API (org.apache.spark.ml.*) which will take DataFrames: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS In the examples/ directory for ml/, you can find a MovieLensALS example. Good luck! Joseph On Tue, Apr 21, 2015 at 4:58 AM, ayan guha guha.a...@gmail.com wrote: Hi I am getting an error Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD It was working fine in 1.2.0 (till last night :)) Any solution? I am thinking to map the training dataframe back to a RDD, byt will lose the schema information. Best Ayan On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com wrote: Hi Just upgraded to Spark 1.3.1. I am getting an warning Warning (from warnings module): File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py, line 191 warnings.warn(inferSchema is deprecated, please use createDataFrame instead) UserWarning: inferSchema is deprecated, please use createDataFrame instead However, documentation still says to use inferSchema. Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in section Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Rank:8 Lmbda:1.0 iteration:10 Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha
Re: how to make a spark cluster ?
I did some performance check on socLiveJournal PageRank b/w my local machine (8 cores, 16 gb ) in local mode and my small cluster (4 nodes, 12 cores, 40 gb) and i found cluster mode is way faster than local mode. So I confused. no. of iterations --- Local mode(in mins) -- cluster mode(in mins) 1 20 1 231.3 1.2 3 39.5 1.3 556.4 1.6 10 117.26 2.6 with the help of this , I think , might be installing spark cluster on the same machine and instead of giving local[no. of cores] , I'll set to spark://host:7070. Please let me know If I wrong somewhere. On Tue, Apr 21, 2015 at 6:27 PM, Reynold Xin r...@databricks.com wrote: Actually if you only have one machine, just use the Spark local mode. Just download the Spark tarball, untar it, set master to local[N], where N = number of cores. You are good to go. There is no setup of job tracker or Hadoop. On Mon, Apr 20, 2015 at 3:21 PM, haihar nahak harihar1...@gmail.com wrote: Thank you :) On Mon, Apr 20, 2015 at 4:46 PM, Jörn Franke jornfra...@gmail.com wrote: Hi, If you have just one physical machine then I would try out Docker instead of a full VM (would be waste of memory and CPU). Best regards Le 20 avr. 2015 00:11, hnahak harihar1...@gmail.com a écrit : Hi All, I've big physical machine with 16 CPUs , 256 GB RAM, 20 TB Hard disk. I just need to know what should be the best solution to make a spark cluster? If I need to process TBs of data then 1. Only one machine, which contain driver, executor, job tracker and task tracker everything. 2. create 4 VMs and each VM should consist 4 CPUs , 64 GB RAM 3. create 8 VMs and each VM should consist 2 CPUs , 32 GB RAM each please give me your views/suggestions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-spark-cluster-tp22563.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- {{{H2N}}}-(@: -- {{{H2N}}}-(@:
Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized
Sure. But in general, I am assuming this Graph is unexpectedly null when DStream is being serialized must mean something. Under which circumstances, such an exception would trigger? On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com wrote: Yeah, I am not sure what is going on. The only way to figure to take a look at the disassembled bytecodes using javap. TD On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com wrote: At this point I am assuming that nobody has an idea... I am still going to give it a last shot just in case it was missed by some people :) Thanks, On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, so I start the context at the very end when all the piping is done. BTW a foreachRDD will be called on the resulting dstream.map() right after that. The puzzling thing is why removing the context bounds solve the problem... What does this exception mean in general? On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote: When are you getting this exception? After starting the context? TD On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable( ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext. clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream. scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala: 985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized
It is kind of unexpected, i can imagine a real scenario under which it should trigger. But obviously I am missing something :) TD On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Sure. But in general, I am assuming this Graph is unexpectedly null when DStream is being serialized must mean something. Under which circumstances, such an exception would trigger? On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com wrote: Yeah, I am not sure what is going on. The only way to figure to take a look at the disassembled bytecodes using javap. TD On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com wrote: At this point I am assuming that nobody has an idea... I am still going to give it a last shot just in case it was missed by some people :) Thanks, On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, so I start the context at the very end when all the piping is done. BTW a foreachRDD will be called on the resulting dstream.map() right after that. The puzzling thing is why removing the context bounds solve the problem... What does this exception mean in general? On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote: When are you getting this exception? After starting the context? TD On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable( ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext. clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream. scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala: 985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Re: sparksql - HiveConf not found during task deserialization
Akhil, Thanks for the suggestions. I tried out sc.addJar, --jars, --conf spark.executor.extraClassPath and none of them helped. I added stuff into compute-classpath.sh. That did not change anything. I checked the classpath of the running executor and made sure that the hive jars are in that dir. For me the most confusing thing is that the executor can actually create HiveConf objects but when it cannot find that when the task deserializer is at work. On 20 April 2015 at 14:18, Akhil Das ak...@sigmoidanalytics.com wrote: Can you try sc.addJar(/path/to/your/hive/jar), i think it will resolve it. Thanks Best Regards On Mon, Apr 20, 2015 at 12:26 PM, Manku Timma manku.tim...@gmail.com wrote: Akhil, But the first case of creating HiveConf on the executor works fine (map case). Only the second case fails. I was suspecting some foul play with classloaders. On 20 April 2015 at 12:20, Akhil Das ak...@sigmoidanalytics.com wrote: Looks like a missing jar, try to print the classpath and make sure the hive jar is present. Thanks Best Regards On Mon, Apr 20, 2015 at 11:52 AM, Manku Timma manku.tim...@gmail.com wrote: I am using spark-1.3 with hadoop-provided and hive-provided and hive-0.13.1 profiles. I am running a simple spark job on a yarn cluster by adding all hadoop2 and hive13 jars to the spark classpaths. If I remove the hive-provided while building spark, I dont face any issue. But with hive-provided I am getting a java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf in the yarn executor. Code is below: import org.apache.spark._ import org.apache.spark.sql._ import org.apache.hadoop.hive.conf.HiveConf object Simple { def main(args: Array[String]) = { val sc = new SparkContext(new SparkConf()) val sqlC = new org.apache.spark.sql.hive.HiveContext(sc) val x = sc.parallelize(1 to 2).map(x = { val h = new HiveConf; h.getBoolean(hive.test, false) }) x.collect.foreach(x = println(s- $x )) val result = sqlC.sql( select * from products_avro order by month, name, price ) result.collect.foreach(println) } } The first job (involving map) runs fine. HiveConf is instantiated and the conf variable is looked up etc. But the second job (involving the select * query) throws the class not found exception. The task deserializer is the one throwing the exception. It is unable to find the class in its classpath. Not sure what is different from the first job which also involved HiveConf. 157573 [task-result-getter-3] 2015/04/20 11:01:48:287 WARN TaskSetManager: Lost task 0.2 in stage 2.0 (TID 4, localhost): java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2436) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.init(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
problem writing to s3
I am having a strange problem writing to s3 that I have distilled to this minimal example: def jsonRaw = s${outprefix}-json-raw def jsonClean = s${outprefix}-json-clean val txt = sc.textFile(inpath)//.coalesce(shards, false) txt.count val res = txt.saveAsTextFile(jsonRaw) val txt2 = sc.textFile(jsonRaw +/part-*) txt2.count txt2.saveAsTextFile(jsonClean) This code should simply copy files from inpath to jsonRaw and then from jsonRaw to jsonClean. This code executes all the way down to the last line where it hangs after creating the output directory contatining a _temporary_$folder but no actual files not even temporary ones. `outputprefix` is and bucket url, both jsonRaw and jsonClean are in the same bucket. Both calls .count succeed and return the same number. This means Spark can read from inpath and can both read from and write to jsonRaw. Since jsonClean is in the same bucket as jsonRaw and the final line does create the directory, I cannot think of any reason why the files should not be written. If there were any access or url problems they should already manifest when writing jsonRaw. This problem is completely reproduceable with Spark 1.2.1 and 1.3.1 The console output from the last line is scala txt0.saveAsTextFile(jsonClean) 15/04/21 22:55:48 INFO storage.BlockManager: Removing broadcast 3 15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3_piece0 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3_piece0 of size 2024 dropped from memory (free 278251716) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-51-181-81.ec2.internal:45199 in memory (size: 2024.0 B, free: 265.4 MB) 15/04/21 22:55:48 INFO storage.BlockManagerMaster: Updated info of block broadcast_3_piece0 15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3 of size 2728 dropped from memory (free 27825) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-166-129-153.ec2.internal:46671 in memory (size: 2024.0 B, free: 13.8 GB) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-51-153-34.ec2.internal:51691 in memory (size: 2024.0 B, free: 13.8 GB) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-158-142-155.ec2.internal:54690 in memory (size: 2024.0 B, free: 13.8 GB) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-61-144-7.ec2.internal:44849 in memory (size: 2024.0 B, free: 13.8 GB) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-69-77-180.ec2.internal:42417 in memory (size: 2024.0 B, free: 13.8 GB) 15/04/21 22:55:48 INFO spark.ContextCleaner: Cleaned broadcast 3 15/04/21 22:55:49 INFO spark.SparkContext: Starting job: saveAsTextFile at console:38 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Got job 2 (saveAsTextFile at console:38) with 96 output partitions (allowLocal=false) 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Final stage: Stage 2(saveAsTextFile at console:38) 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Missing parents: List() 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[5] at saveAsTextFile at console:38), which has no missing parents 15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(22248) called with curMem=48112, maxMem=278302556 15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 21.7 KB, free 265.3 MB) 15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(17352) called with curMem=70360, maxMem=278302556 15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 16.9 KB, free 265.3 MB) 15/04/21 22:55:49 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on ip-10-51-181-81.ec2.internal:45199 (size: 16.9 KB, free: 265.4 MB) 15/04/21 22:55:49 INFO storage.BlockManagerMaster: Updated info of block broadcast_4_piece0 15/04/21 22:55:49 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:839 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting 96 missing tasks from Stage 2 (MapPartitionsRDD[5] at saveAsTextFile at console:38) 15/04/21 22:55:49 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 96 tasks 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 192, ip-10-166-129-153.ec2.internal, PROCESS_LOCAL, 1377 bytes) 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 193, ip-10-61-144-7.ec2.internal, PROCESS_LOCAL, 1377 bytes) 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 2.0 (TID 194, ip-10-158-142-155.ec2.internal, PROCESS_LOCAL, 1377 bytes) 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 3.0
Efficient saveAsTextFile by key, directory for each key?
Is there an efficient way to save an RDD with saveAsTextFile in such a way that the data gets shuffled into separated directories according to a key? (My end goal is to wrap the result in a multi-partitioned Hive table) Suppose you have: case class MyData(val0: Int, val1: string, directory_name: String) and an RDD called myrdd with type RDD[MyData]. Suppose that you already have an array of the distinct directory_name's, called distinct_directories. A very inefficient way to to this is: distinct_directories.foreach( dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name ) .map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,)) .coalesce(5) .saveAsTextFile(base_dir_name/ + f$dir_name) ) I tried this solution, and it does not do the multiple myrdd.filter's in parallel. I'm guessing partitionBy might be in the efficient solution if an easy efficient solution exists... Thanks, Arun
Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized
Yeah, I am not sure what is going on. The only way to figure to take a look at the disassembled bytecodes using javap. TD On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com wrote: At this point I am assuming that nobody has an idea... I am still going to give it a last shot just in case it was missed by some people :) Thanks, On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, so I start the context at the very end when all the piping is done. BTW a foreachRDD will be called on the resulting dstream.map() right after that. The puzzling thing is why removing the context bounds solve the problem... What does this exception mean in general? On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote: When are you getting this exception? After starting the context? TD On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable( ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream. scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala: 985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Re: Number of input partitions in SparkContext.sequenceFile
Hi, It should generate the same no of partitions as the no. of splits. Howd you check no of partitions.? Also please paste your file size and hdfs-site.xml and mapred-site.xml here. Thanks and Regards, Archit Thakur. On Sat, Apr 18, 2015 at 6:20 PM, Wenlei Xie wenlei@gmail.com wrote: Hi, I am wondering the mechanism that determines the number of partitions created by SparkContext.sequenceFile ? For example, although my file has only 4 splits, Spark would create 16 partitions for it. Is it determined by the file size? Is there any way to control it? (Looks like I can only tune minPartitions but not maxPartitions) Thank you! Best, Wenlei
Meet Exception when learning Broadcast Variables
Hi, experts. I wrote a very little program to learn how to use Broadcast Variables, but met an exception. The program and the exception are listed as following. Could anyone help me to solve this problem? Thanks! **My Program is as following** object TestBroadcast02 { var brdcst : Broadcast[Array[Int]] = null def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) brdcst = sc.broadcast(Array(1,2,3,4,5,6)) val rdd = sc.textFile(hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt) rdd.foreachPartition(fun1) sc.stop() } def fun1(it : Iterator[String]) : Unit = { val v = brdcst.value for(i - v) println(BroadCast Variable:+i) for(j - it) println(Text File Content:+j) } } **The Exception is as following** 15/04/21 17:39:53 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, bgdt01.dev.hrb): java.lang.NullPointerException at dhao.test.BroadCast.TestBroadcast02$.fun1(TestBroadcast02.scala:27) at dhao.test.BroadCast.TestBroadcast02$$anonfun$main$1.apply(TestBroadcast02.scala:22) at dhao.test.BroadCast.TestBroadcast02$$anonfun$main$1.apply(TestBroadcast02.scala:22) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) By the way, if I use anonymous function instead of 'fun1' in my program, it works. But since I think the readability is not good for anonymous functions, I still prefer to use the 'fun1' .
Re: Spark Unit Testing
Hello James, Did you check the following resources: - https://github.com/apache/spark/tree/master/streaming/src/test/java/org/apache/spark/streaming - http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs -- Emre Sevinç http://www.bigindustries.be/ On Tue, Apr 21, 2015 at 1:26 PM, James King jakwebin...@gmail.com wrote: I'm trying to write some unit tests for my spark code. I need to pass a JavaPairDStreamString, String to my spark class. Is there a way to create a JavaPairDStream using Java API? Also is there a good resource that covers an approach (or approaches) for unit testing using Java. Regards jk -- Emre Sevinc
Re: SparkSQL performance
Thanks for the hints guys! much appreciated! Even if I just do a something like: Select * from tableX where attribute1 5 I see similar behaviour. @Michael Could you point me to any sample code that uses Spark's Rows? We are at a phase where we can actually change our JavaBeans for something that provides a better performance than what we are seeing now. Would you recommend using Avro presentation then? Thanks again! Renato M. 2015-04-21 1:18 GMT+02:00 Michael Armbrust mich...@databricks.com: There is a cost to converting from JavaBeans to Rows and this code path has not been optimized. That is likely what you are seeing. On Mon, Apr 20, 2015 at 3:55 PM, ayan guha guha.a...@gmail.com wrote: SparkSQL optimizes better by column pruning and predicate pushdown, primarily. Here you are not taking advantage of either. I am curious to know what goes in your filter function, as you are not using a filter in SQL side. Best Ayan On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: Does anybody have an idea? a clue? a hint? Thanks! Renato M. 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com: Hi all, I have a simple query Select * from tableX where attribute1 between 0 and 5 that I run over a Kryo file with four partitions that ends up being around 3.5 million rows in our case. If I run this query by doing a simple map().filter() it takes around ~9.6 seconds but when I apply schema, register the table into a SqlContext, and then run the query, it takes around ~16 seconds. This is using Spark 1.2.1 with Scala 2.10.0 I am wondering why there is such a big gap on performance if it is just a filter. Internally, the relation files are mapped to a JavaBean. This different data presentation (JavaBeans vs SparkSQL internal representation) could lead to such difference? Is there anything I could do to make the performance get closer to the hard-coded option? Thanks in advance for any suggestions or ideas. Renato M.
Spark Unit Testing
I'm trying to write some unit tests for my spark code. I need to pass a JavaPairDStreamString, String to my spark class. Is there a way to create a JavaPairDStream using Java API? Also is there a good resource that covers an approach (or approaches) for unit testing using Java. Regards jk
Error while running SparkPi in Hadoop HA
Hi all, I'm wondering if SparkPi works with hadoop HA (I guess it should) Hadoop's pi example works great on my cluster, so after having that done I installed spark and in the worker log I'm seeing two problems that might be related. Versions: Hadoop 2.6.0 Spark 1.3.1 I'm running : ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 2 --driver-memory 2g --executor-memory 1g --executor-cores 1 lib/spark-examples*.jar 10 It seems like it can't see the resource manager but I pinged and telnet it from the node and that works, also: hadoop's pi example works... So I'm not sure why spark is not seeing the RM 15/04/21 15:56:21 INFO yarn.YarnRMClient: Registering the ApplicationMaster*15/04/21 15:56:22 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 http://0.0.0.0/0.0.0.0:8030. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:23 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 http://0.0.0.0/0.0.0.0:8030. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)* 15/04/21 15:56:24 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:25 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:26 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:27 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:28 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:29 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:30 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 8 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:31 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/21 15:56:51 INFO cluster.YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) 15/04/21 15:56:51 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/04/21 15:56:51 INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:35 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Got job 0 (reduce at SparkPi.scala:35) with 10 output partitions (allowLocal=false) 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce at SparkPi.scala:35) 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Missing parents: List() 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:31), which has no missing parents 15/04/21 15:56:51 INFO cluster.YarnClusterScheduler: Cancelling stage 0 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Stage 0 (reduce at SparkPi.scala:35) failed in Unknown s 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Job 0 failed: reduce at SparkPi.scala:35, took 0.121974 s*15/04/21 15:56:51 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) java.lang.reflect.Constructor.newInstance(Constructor.java:526)* org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68) org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73) org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:79)
Re: Join on DataFrames from the same source (Pyspark)
you are correct. Just found the same thing. You are better off with sql, then. userSchemaDF = ssc.createDataFrame(userRDD) userSchemaDF.registerTempTable(users) #print userSchemaDF.take(10) #SQL API works as expected sortedDF = ssc.sql(SELECT userId,age,gender,work from users order by work,age) #print sortedDF.take(10) df1 = userSchemaDF.select(userId,age) df1.registerTempTable(df1) df2 = userSchemaDF.select(userId,work) df2.registerTempTable(df2) dfjs = ssc.sql(select age,work from df1 d1 inner join df2 d2 on d1.userId=d2.userId) print dfjs.count() #DF API does not dfj = df1.join(df2,df1.userId==df2.userId,inner) print dfj.count() 943 889249 On Wed, Apr 22, 2015 at 1:10 AM, Karlson ksonsp...@siberie.de wrote: Sorry, my code actually was df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') But in Spark 1.4.0 this does not seem to make any difference anyway and the problem is the same with both versions. On 2015-04-21 17:04, ayan guha wrote: your code should be df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') Your current code is generating a tupple, and of course df_1 and df_2 are different, so join is yielding to cartesian. Best Ayan On Wed, Apr 22, 2015 at 12:42 AM, Karlson ksonsp...@siberie.de wrote: Hi, can anyone confirm (and if so elaborate on) the following problem? When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example: I load a DataFrame with n rows from disk: df = sql_context.parquetFile('data.parquet') Then I create two DataFrames from that source. df_one = df.select(['col1', 'col2']) df_two = df.select(['col1', 'col3']) Finally I want to (inner) join them back together: df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner') The key in col1 is unique. The resulting DataFrame should have n rows, however it does have n*n rows. That does not happen, when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
Re: Instantiating/starting Spark jobs programmatically
Could you possibly describe what you are trying to learn how to do in more detail? Some basics of submitting programmatically: - Create a SparkContext instance and use that to build your RDDs - You can only have 1 SparkContext per JVM you are running, so if you need to satisfy concurrent job requests you would need to manage a SparkContext as a shared resource on that server. Keep in mind if something goes wrong with that SparkContext, all running jobs would probably be in a failed state and you'd need to try to get a new SparkContext. - There are System.exit calls built into Spark as of now that could kill your running JVM. We have shadowed some of the most offensive bits within our own application to work around this. You'd likely want to do that or to do your own Spark fork. For example, if the SparkContext can't connect to your cluster master node when it is created, it will System.exit. - You'll need to provide all of the relevant classes that your platform uses in the jobs on the classpath of the spark cluster. We do this with a JAR file loaded from S3 dynamically by a SparkContext, but there are other options. On Mon, Apr 20, 2015 at 10:12 PM, firemonk9 dhiraj.peech...@gmail.com wrote: I have built a data analytics SaaS platform by creating Rest end points and based on the type of job request I would invoke the necessary spark job/jobs and return the results as json(async). I used yarn-client mode to submit the jobs to yarn cluster. hope this helps. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Instantiating-starting-Spark-jobs-programmatically-tp22577p22584.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org