Re: Spark Streaming graceful shutdown in Spark 1.4
By the way this happens when I stooped the Driver process ... On Tue, May 19, 2015 at 12:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order. *INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop signal to all 3 receivers* ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 0: Stopped by driver ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 1: Stopped by driver ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 2: Stopped by driver *INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook* INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/batch/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/batch,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/metrics/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/static,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/environment/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/environment,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/pool,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/job,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs,null} INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at http://10.252.5.113:4040 INFO : org.apache.spark.scheduler.DAGScheduler - Stopping DAGScheduler INFO : org.apache.spark.scheduler.DAGScheduler - Job 4 failed: start at Consumer.java:122, took 10.398746 s *Exception in thread Thread-28 org.apache.spark.SparkException: Job cancelled because SparkContext was shut down* at
Re: TwitterUtils on Windows
Hi Justin, Can you try with sbt, may be that will help. - Install sbt for windows http://www.scala-sbt.org/0.13/tutorial/Installing-sbt-on-Windows.html - Create a lib directory in your project directory - Place these jars in it: - spark-streaming-twitter_2.10-1.3.1.jar - twitter4j-async-3.0.3.jar - twitter4j-core-3.0.3.jar - twitter4j-media-support-3.0.3.jar - twitter4j-stream-3.0.3.jar - Create a build.sbt file and add these contents: name := twitterStream version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark % spark-streaming_2.10 % 1.3.1 - Create a TwitterStream.scala and add these contents: import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming._ import org.apache.spark.{SparkContext, SparkConf} object TwitterStream { def main(args: Array[String]) { System.setProperty(twitter4j.oauth.consumerKey,*) System.setProperty(twitter4j.oauth.consumerSecret,*) System.setProperty(twitter4j.oauth.accessToken,*) System.setProperty(twitter4j.oauth.accessTokenSecret,*) val sconf = new SparkConf() .setMaster(local[*]) .setAppName(TwitterStream) val sc = new SparkContext(sconf) val ssc = new StreamingContext(sc, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None) ssc.start() ssc.awaitTermination() } } - Now do a sbt run Thanks Best Regards On Tue, May 19, 2015 at 9:56 AM, Justin Pihony justin.pih...@gmail.com wrote: I think I found the answer - http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-td10056.html Do I have no way of running this in Windows locally? On Mon, May 18, 2015 at 10:44 PM, Justin Pihony justin.pih...@gmail.com wrote: I'm not 100% sure that is causing a problem, though. The stream still starts, but is giving blank output. I checked the environment variables in the ui and it is running local[*], so there should be no bottleneck there. On Mon, May 18, 2015 at 10:08 PM, Justin Pihony justin.pih...@gmail.com wrote: I am trying to print a basic twitter stream and receiving the following error: 15/05/18 22:03:14 INFO Executor: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with timestamp 1432000973058 15/05/18 22:03:14 INFO Utils: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to C:\Users\Justin\AppData\Local\Temp\spark-4a37d3 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja va:715) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:366) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Code is: spark-shell --jars \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming._ System.setProperty(twitter4j.oauth.consumerKey,*) System.setProperty(twitter4j.oauth.consumerSecret,*) System.setProperty(twitter4j.oauth.accessToken,*)
Re: Spark Streaming graceful shutdown in Spark 1.4
I don't think you should rely on a shutdown hook. Ideally you try to stop it in the main exit path of your program, even in case of an exception. On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming graceful shutdown in Spark 1.4
Thenka Sean . you are right. If driver program is running then I can handle shutdown in main exit path . But if Driver machine is crashed (if you just stop the application, for example killing the driver process ), then Shutdownhook is the only option isn't it ? What I try to say is , just doing ssc.stop in sys.ShutdownHookThread or Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need to use the Utils.addShutdownHook with a priority .. So just checking if Spark Streaming can make graceful shutdown as default shutdown mechanism. Dibyendu On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: I don't think you should rely on a shutdown hook. Ideally you try to stop it in the main exit path of your program, even in case of an exception. On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order.
Re: group by and distinct performance issue
Hi Peer, If you open the driver UI (running on port 4040) you can see the stages and the tasks happening inside it. Best way to identify the bottleneck for a stage is to see if there's any time spending on GC, and how many tasks are there per stage (it should be a number total # cores to achieve max parallelism). Also you can see for each task how long does it take etc into consideration. Thanks Best Regards On Tue, May 19, 2015 at 12:58 PM, Peer, Oded oded.p...@rsa.com wrote: I am running Spark over Cassandra to process a single table. My task reads a single days’ worth of data from the table and performs 50 group by and distinct operations, counting distinct userIds by different grouping keys. My code looks like this: JavaRddRow rdd = sc.parallelize().mapPartitions().cache() // reads the data from the table for each groupingKey { JavaPairRddGroupingKey, UserId groupByRdd = rdd.mapToPair(); JavaPairRDDGroupingKey, Long countRdd = groupByRdd.distinct().mapToPair().reduceByKey() // counts distinct values per grouping key } The distinct() stage takes about 2 minutes for every groupByValue, and my task takes well over an hour to complete. My cluster has 4 nodes and 30 GB of RAM per Spark process, the table size is 4 GB. How can I identify the bottleneck more accurately? Is it caused by shuffling data? How can I improve the performance? Thanks, Oded
Re: --jars works in yarn-client but not yarn-cluster mode, why?
Thanks, Marcelo! Below is the full log, SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/avro-tools-1.7.6-cdh5.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/05/19 14:08:58 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/05/19 14:08:59 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1432015548391_0003_01 15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nobody, raofengyun); users with modify permissions: Set(nobody, raofengyun) 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark context initialization 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 15/05/19 14:09:00 INFO spark.SparkContext: Running Spark version 1.3.0 15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nobody, raofengyun); users with modify permissions: Set(nobody, raofengyun) 15/05/19 14:09:01 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/05/19 14:09:01 INFO Remoting: Starting remoting 15/05/19 14:09:01 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@gs-server-v-127:7191] 15/05/19 14:09:01 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@gs-server-v-127:7191] 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'sparkDriver' on port 7191. 15/05/19 14:09:01 INFO spark.SparkEnv: Registering MapOutputTracker 15/05/19 14:09:01 INFO spark.SparkEnv: Registering BlockManagerMaster 15/05/19 14:09:01 INFO storage.DiskBlockManager: Created local directory at /data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/blockmgr-3250910b-693e-46ff-b057-26d552fd8abd 15/05/19 14:09:01 INFO storage.MemoryStore: MemoryStore started with capacity 259.7 MB 15/05/19 14:09:01 INFO spark.HttpFileServer: HTTP File server directory is /data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/httpd-5bc614bc-d8b1-473d-a807-4d9252eb679d 15/05/19 14:09:01 INFO spark.HttpServer: Starting HTTP Server 15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/05/19 14:09:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:9349 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'HTTP file server' on port 9349. 15/05/19 14:09:01 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/05/19 14:09:01 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/05/19 14:09:01 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:63023 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'SparkUI' on port 63023. 15/05/19 14:09:01 INFO ui.SparkUI: Started SparkUI at http://gs-server-v-127:63023 15/05/19 14:09:02 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 15/05/19 14:09:02 INFO netty.NettyBlockTransferService: Server created on 33526 15/05/19 14:09:02 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/05/19 14:09:02 INFO storage.BlockManagerMasterActor: Registering block manager gs-server-v-127:33526 with 259.7 MB RAM, BlockManagerId(driver, gs-server-v-127, 33526) 15/05/19 14:09:02 INFO storage.BlockManagerMaster: Registered BlockManager 15/05/19 14:09:02 INFO scheduler.EventLoggingListener: Logging events to hdfs://gs-server-v-127:8020/user/spark/applicationHistory/application_1432015548391_0003 15/05/19 14:09:02 INFO yarn.ApplicationMaster: Listen to driver: akka.tcp://sparkDriver@gs-server-v-127:7191/user/YarnScheduler 15/05/19 14:09:02 INFO cluster.YarnClusterSchedulerBackend: ApplicationMaster registered as Actor[akka://sparkDriver/user/YarnAM#1902752386] 15/05/19 14:09:02 INFO client.RMProxy: Connecting to ResourceManager at gs-server-v-127/10.200.200.56:8030 15/05/19 14:09:02 INFO yarn.YarnRMClient: Registering the ApplicationMaster 15/05/19 14:09:03 INFO yarn.YarnAllocator: Will request 2 executor containers, each with 1 cores and 4480 MB memory
Re: Spark Streaming graceful shutdown in Spark 1.4
If you wanted to stop it gracefully, then why are you not calling ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt matter whether the shutdown hook was called or not. TD On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, Just figured out that if I want to perform graceful shutdown of Spark Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for Spark Core, that gets anyway called , which leads to graceful shutdown from Spark streaming failed with error like Sparkcontext already closed issue. To solve this , I need to explicitly add Utils.addShutdownHook in my driver with higher priority ( say 150 ) than Spark's shutdown priority of 50 , and there I specified streamingcontext stop method with (false , true) parameter. Just curious to know , if this is how we need to handle shutdown hook going forward ? Can't we make the streaming shutdown default to gracefully shutdown ? Also the Java Api for adding shutdownhook in Utils looks very dirty with methods like this .. Utils.addShutdownHook(150, new Function0BoxedUnit() { @Override public BoxedUnit apply() { return null; } @Override public byte apply$mcB$sp() { return 0; } @Override public char apply$mcC$sp() { return 0; } @Override public double apply$mcD$sp() { return 0; } @Override public float apply$mcF$sp() { return 0; } @Override public int apply$mcI$sp() { // TODO Auto-generated method stub return 0; } @Override public long apply$mcJ$sp() { return 0; } @Override public short apply$mcS$sp() { return 0; } @Override public void apply$mcV$sp() { *jsc.stop(false, true);* } @Override public boolean apply$mcZ$sp() { // TODO Auto-generated method stub return false; } });
Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application?
If you dont want the fileStream to start only after certain event has happened, why not start the streamingContext after that event? TD On Sun, May 17, 2015 at 7:51 PM, Haopu Wang hw...@qilinsoft.com wrote: I want to use file stream as input. And I look at SparkStreaming document again, it's saying file stream doesn't need a receiver at all. So I'm wondering if I can control a specific DStream instance. -- *From:* Evo Eftimov [mailto:evo.efti...@isecc.com] *Sent:* Monday, May 18, 2015 12:39 AM *To:* 'Akhil Das'; Haopu Wang *Cc:* 'user' *Subject:* RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application? You can make ANY *standard* receiver sleep by implementing a custom Message Deserializer class with sleep method inside it. *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Sunday, May 17, 2015 4:29 PM *To:* Haopu Wang *Cc:* user *Subject:* Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application? Why not just trigger your batch job with that event? If you really need streaming, then you can create a custom receiver and make the receiver sleep till the event has happened. That will obviously run your streaming pipelines without having any data to process. Thanks Best Regards On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote: In my application, I want to start a DStream computation only after an special event has happened (for example, I want to start the receiver only after the reference data has been properly initialized). My question is: it looks like the DStream will be started right after the StreaminContext has been started. Is it possible to delay the start of specific DStream? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast variables can be rebroadcast?
Hi Imran, If I understood you correctly, you are suggesting to simply call broadcast again from the driver program. This is exactly what I am hoping will work as I have the Broadcast data wrapped up and I am indeed (re)broadcasting the wrapper over again when the underlying data changes. However, documentation seems to suggest that one cannot re-broadcast. Is my understanding accurate? Thanks NB On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com wrote: Rather than updating the broadcast variable, can't you simply create a new one? When the old one can be gc'ed in your program, it will also get gc'ed from spark's cache (and all executors). I think this will make your code *slightly* more complicated, as you need to add in another layer of indirection for which broadcast variable to use, but not too bad. Eg., from var myBroadcast = sc.broadcast( ...) (0 to 20).foreach{ iteration = // ... some rdd operations that involve myBroadcast ... myBroadcast.update(...) // wrong! dont' update a broadcast variable } instead do something like: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } var myBroadcast = sc.broadcast(...) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with whatever you need to update it } On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote: Thanks Ayan. Can we rebroadcast after updating in the driver? Thanks NB. On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote: Hi broadcast variables are shipped for the first time it is accessed in a transformation to the executors used by the transformation. It will NOT updated subsequently, even if the value has changed. However, a new value will be shipped to any new executor comes into play after the value has changed. This way, changing value of broadcast variable is not a good idea as it can create inconsistency within cluster. From documentatins: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote: Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: The broadcast variable is like a pointer. If the underlying data changes then the changes will be visible throughout the cluster. On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote: Hello, Once a broadcast variable is created using sparkContext.broadcast(), can it ever be updated again? The use case is for something like the underlying lookup data changing over time. Thanks NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
Re: Spark Streaming graceful shutdown in Spark 1.4
You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order. *INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop signal to all 3 receivers* ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 0: Stopped by driver ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 1: Stopped by driver ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 2: Stopped by driver *INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook* INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/batch/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/batch,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/metrics/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/static,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/environment/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/environment,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/pool,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/job,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs,null} INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at http://10.252.5.113:4040 INFO : org.apache.spark.scheduler.DAGScheduler - Stopping DAGScheduler INFO : org.apache.spark.scheduler.DAGScheduler - Job 4 failed: start at Consumer.java:122, took 10.398746 s *Exception in thread Thread-28 org.apache.spark.SparkException: Job cancelled because SparkContext was shut down* at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:736) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:735) INFO : org.apache.spark.scheduler.DAGScheduler
Re: org.apache.spark.shuffle.FetchFailedException :: Migration from Spark 1.2 to 1.3
There were some similar discussion happened on JIRA https://issues.apache.org/jira/browse/SPARK-3633 may be that will give you some insights. Thanks Best Regards On Mon, May 18, 2015 at 10:49 PM, zia_kayani zia.kay...@platalytics.com wrote: Hi, I'm getting this exception after shifting my code from Spark 1.2 to Spark 1.3 15/05/18 18:22:39 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 84, cloud8-server): FetchFailed(BlockManagerId(1, cloud4-server, 7337), shuffleId=0, mapId=9, reduceId=1, message= org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException: Failed to open file: /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202) at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index (Permission denied) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:191) ... 23 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-shuffle-FetchFailedException-Migration-from-Spark-1-2-to-1-3-tp22937.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
group by and distinct performance issue
I am running Spark over Cassandra to process a single table. My task reads a single days' worth of data from the table and performs 50 group by and distinct operations, counting distinct userIds by different grouping keys. My code looks like this: JavaRddRow rdd = sc.parallelize().mapPartitions().cache() // reads the data from the table for each groupingKey { JavaPairRddGroupingKey, UserId groupByRdd = rdd.mapToPair(); JavaPairRDDGroupingKey, Long countRdd = groupByRdd.distinct().mapToPair().reduceByKey() // counts distinct values per grouping key } The distinct() stage takes about 2 minutes for every groupByValue, and my task takes well over an hour to complete. My cluster has 4 nodes and 30 GB of RAM per Spark process, the table size is 4 GB. How can I identify the bottleneck more accurately? Is it caused by shuffling data? How can I improve the performance? Thanks, Oded
spark streaming doubt
What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or multiple ?
Re: Spark and Flink
it's sound good, maybe you can send me pseudo structure, that is my fist maven project. best regards, paul 2015-05-18 14:05 GMT+02:00 Robert Metzger rmetz...@apache.org: Hi, I would really recommend you to put your Flink and Spark dependencies into different maven modules. Having them both in the same project will be very hard, if not impossible. Both projects depend on similar projects with slightly different versions. I would suggest a maven module structure like this: yourproject-parent (a pom module) -- yourproject-common -- yourproject-flink -- yourproject-spark On Mon, May 18, 2015 at 10:00 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hi, if i add your dependency i get over 100 errors, now i change the version number: dependencies dependency groupIdcom.fasterxml.jackson.module/groupId artifactIdjackson-module-scala_2.10/artifactId version2.4.4/version exclusions exclusion groupIdcom.google.guava/groupId artifactIdguava/artifactId /exclusion /exclusions /dependency now the pom is fine, but i get the same error by run spark: WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.servlet.DefaultServlet-608411067: java.lang.NoSuchMethodError: org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V java.lang.NoSuchMethodError: org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V at org.eclipse.jetty.servlet.NIOResourceCache.init(NIOResourceCache.java:41) at org.eclipse.jetty.servlet.DefaultServlet.init(DefaultServlet.java:223) at javax.servlet.GenericServlet.init(GenericServlet.java:244) at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:442) at org.eclipse.jetty.servlet.ServletHolder.doStart(ServletHolder.java:270) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:721) at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:279) at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:717) at org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:155) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.handler.HandlerCollection.doStart(HandlerCollection.java:229) at org.eclipse.jetty.server.handler.ContextHandlerCollection.doStart(ContextHandlerCollection.java:172) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.handler.HandlerWrapper.doStart(HandlerWrapper.java:95) at org.eclipse.jetty.server.Server.doStart(Server.java:282) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:209) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext.init(SparkContext.scala:224) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at mgm.tp.bigdata.tempGeoKmeans.Spark.SparkMain.main(SparkMain.java:37) ... what i do wrong? best regards paul 2015-05-13 15:43 GMT+02:00 Ted Yu yuzhih...@gmail.com: You can use exclusion to remove the undesired jetty version. Here is syntax: dependency groupIdcom.fasterxml.jackson.module/groupId artifactIdjackson-module-scala_2.10/artifactId version${fasterxml.jackson.version}/version exclusions exclusion groupIdcom.google.guava/groupId artifactIdguava/artifactId /exclusion /exclusions /dependency On Wed, May 13, 2015 at 6:41 AM, Paul Röwer paul.roewer1...@googlemail.com wrote: Okay. And how i get it clean in my maven project? Am 13. Mai 2015 15:15:34 MESZ, schrieb Ted Yu yuzhih...@gmail.com: You can run the following command: mvn dependency:tree And see what jetty versions are brought in. Cheers On May 13, 2015, at 6:07 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hi, i use spark and flink in the same maven project, now i get a exception on working with spark, flink work well the problem are transitiv dependencies.
Re: spark streaming doubt
It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or multiple ?
Spark 1.3.1 Performance Tuning/Patterns for Data Generation Heavy/Throughput Jobs
Hi all, I have a job that, for every row, creates about 20 new objects (i.e. RDD of 100 rows in = RDD 2000 rows out). The reason for this is each row is tagged with a list of the 'buckets' or 'windows' it belongs to. The actual data is about 10 billion rows. Each executor has 60GB of memory. Currently I have a mapPartitions task that is doing this object creation in a Scala Map and then returning the HashMap as an iterator via .toIterator. Is there a more efficient way to do this (assuming I can't use something like flatMap). The job runs (assuming each task size is small enough). But the GC time is understandably off the charts. I've reduced the spark cache memory percentage to 0.05 (as I just need space for a few broadcasts and this is a data churn task). I've left the shuffle memory percent unchanged. What kinds of settings should I be tuning with regards to GC? Looking at https://spark-summit.org/2014/wp-content/uploads/2015/03/SparkSummitEast2015-AdvDevOps-StudentSlides.pdf slide 125 recommends some settings but I'm not sure what would be best here). I tried using -XX:+UseG1GC but it pretty much causes my job to fail (all the executors die). Are there any tips with respect to the ratio of new gen and old gen space when creating lots of objects which will live in a data structure until the entire partition is processed? Any tips for tuning these kinds of jobs would be helpful! Thanks, ~N
Re: py-files (and others?) not properly set up in cluster-mode Spark Yarn job?
Thanks for the quick response and confirmation, Marcelo, I just opened https://issues.apache.org/jira/browse/SPARK-7725. On Mon, May 18, 2015 at 9:02 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Shay, Yeah, that seems to be a bug; it doesn't seem to be related to the default FS nor compareFs either - I can reproduce this with HDFS when copying files from the local fs too. In yarn-client mode things seem to work. Could you file a bug to track this? If you don't have a jira account I can do that for you. On Mon, May 18, 2015 at 9:38 AM, Shay Rojansky r...@roji.org wrote: I'm having issues with submitting a Spark Yarn job in cluster mode when the cluster filesystem is file:///. It seems that additional resources (--py-files) are simply being skipped and not being added into the PYTHONPATH. The same issue may also exist for --jars, --files, etc. We use a simple NFS mount on all our nodes instead of HDFS. The problem is that when I submit a job that has files (via --py-files), these don't get copied across to the application's staging directory, nor do they get added to the PYTHONPATH. On startup, I can clearly see the message Source and destination file systems are the same. Not copying, which is a result of the check here: https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L221 The compareFs function simply looks whether the scheme, host and port are the same, and if so (my case), simply skips the copy. While that in itself isn't a problem, the PYTHONPATH isn't updated either. -- Marcelo
Re: spark streaming doubt
spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or multiple ?
Re: TwitterUtils on Windows
On 19 May 2015, at 03:08, Justin Pihony justin.pih...@gmail.com wrote: 15/05/18 22:03:14 INFO Executor: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with timestamp 1432000973058 15/05/18 22:03:14 INFO Utils: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to C:\Users\Justin\AppData\Local\Temp\spark-4a37d3 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja va:715) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443) at you're going to need to set up Hadoop on your system enough for to execute the chmod operation via the winutils.exe one tactic: grab the hortonworks windows version, install it (including setting up HADOOP_HOME). You don't need to run any of the hadoop services, you just need the binaries in the right place. other: 1. grab the copy of the relevant binaries which I've stuck up online https://github.com/steveloughran/clusterconfigs/tree/master/clusters/morzine/hadoop_home/bin 2. install to some directory hadoop/bin 3. set the env variable HADOOP_HOME to the hadoopp dir (not the bin one) 4. set PATH=%PATH%;%HADOOP_HOME%/bin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Working with slides. How do I know how many times a RDD has been processed?
I tried to insert an flag in the RDD, so I could set in the last position a counter, when the counter gets X, I could do something. But in each slide comes the original RDD although I modificated it. I did this code to check if this is possible but it doesn't work. val rdd1WithFlag = rdd1.map { register = var splitRegister = register._2.split(\\|) var newArray = new Array[String](splitRegister.length + 1) if (splitRegister.length == 2) { splitRegister.copyToArray(newArray) newArray(splitRegister.length) = 0 } else { splitRegister(splitRegister.length) = 1 splitRegister.copyToArray(newArray) } (splitRegister(1), newArray) } If I check the length of splitRegister is always 2 in each slide, it is never three. 2015-05-18 15:36 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: Hi, I have two streaming RDD1 and RDD2 and want to cogroup them. Data don't come in the same time and sometimes they could come with some delay. When I get all data I want to insert in MongoDB. For example, imagine that I get: RDD1 -- T 0 RDD2 --T 0.5 I do cogroup between them but I couldn't store in Mongo yet because it could come more data in the next windows/slide. RDD2' --T 1.5 Another RDD2' comes, I only want to save in Mongo once. So, I should only save it when I get all data. What I know it's how long I should wait as much. Ideally, I would like to save in MongoDB in the last slide for each RDD when I know that there is not possible to get more RDD2 to join with RDD1. Is it possible? how? Maybe there is other way to resolve this problem, any idea?
AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?
Hi all, I might be missing something, but does the new Spark 1.3 sqlContext save interface support using Avro as the schema structure when writing Parquet files, in a similar way to AvroParquetWriter (which I've got working)? I've seen how you can load an avro file and save it as parquet from https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html, but not using the 2 together. Thanks, and apologies if I've missed something obvious! Ewan
How to use spark to access HBase with Security enabled
Hi, experts. I ran the HBaseTest program which is an example from the Apache Spark source code to learn how to use spark to access HBase. But I met the following exception: Exception in thread main org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions: Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0 I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in the above exception. I found a few entries like the following one: 2015-05-19 16:59:11,143 DEBUG [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: RpcServer.listener,port=16020: Caught exception while reading:Authentication is required The above entry did not point to my program clearly. But the time is very near. Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the exception was caused by the Kerberos authentication. But I am not sure. Do anybody know if my guess is right? And if I am right, could anybody tell me how to set Kerberos Authentication in a spark program? I don't know how to do it. I already checked the API doc , but did not found any API useful. Many Thanks! By the way, my spark version is 1.3.0. I also paste the code of HBaseTest in the following: ***Source Code** object HBaseTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HBaseTest) val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, args(0)) // Initialize hBase table if necessary val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(args(0))) { val tableDesc = new HTableDescriptor(args(0)) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.count() sc.stop() } }
Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?
That's right. Also, Spark SQL can automatically infer schema from JSON datasets. You don't need to specify an Avro schema: sqlContext.jsonFile(json/path).saveAsParquetFile(parquet/path) or with the new reader/writer API introduced in 1.4-SNAPSHOT: sqlContext.read.json(json/path).write.parquet(parquet/path) Cheng On 5/19/15 6:07 PM, Ewan Leith wrote: Thanks Cheng, that makes sense. So for new dataframe creation (not conversion from Avro but from JSON or CSV inputs) in Spark we shouldn’t worry about using Avro at all, just use the Spark SQL StructType when building new Dataframes? If so, that will be a lot simpler! Thanks, Ewan *From:*Cheng Lian [mailto:lian.cs@gmail.com] *Sent:* 19 May 2015 11:01 *To:* Ewan Leith; user@spark.apache.org *Subject:* Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces? Hi Ewan, Different from AvroParquetWriter, in Spark SQL we uses StructType as the intermediate schema format. So when converting Avro files to Parquet files, we internally converts Avro schema to Spark SQL StructType first, and then convert StructType to Parquet schema. Cheng On 5/19/15 4:42 PM, Ewan Leith wrote: Hi all, I might be missing something, but does the new Spark 1.3 sqlContext save interface support using Avro as the schema structure when writing Parquet files, in a similar way to AvroParquetWriter (which I’ve got working)? I've seen how you can load an avro file and save it as parquet fromhttps://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html, but not using the 2 together. Thanks, and apologies if I've missed something obvious! Ewan
RE: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?
Thanks Cheng, that's brilliant, you've saved me a headache. Ewan From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: 19 May 2015 11:58 To: Ewan Leith; user@spark.apache.org Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces? That's right. Also, Spark SQL can automatically infer schema from JSON datasets. You don't need to specify an Avro schema: sqlContext.jsonFile(json/path).saveAsParquetFile(parquet/path) or with the new reader/writer API introduced in 1.4-SNAPSHOT: sqlContext.read.json(json/path).write.parquet(parquet/path) Cheng On 5/19/15 6:07 PM, Ewan Leith wrote: Thanks Cheng, that makes sense. So for new dataframe creation (not conversion from Avro but from JSON or CSV inputs) in Spark we shouldn't worry about using Avro at all, just use the Spark SQL StructType when building new Dataframes? If so, that will be a lot simpler! Thanks, Ewan From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: 19 May 2015 11:01 To: Ewan Leith; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces? Hi Ewan, Different from AvroParquetWriter, in Spark SQL we uses StructType as the intermediate schema format. So when converting Avro files to Parquet files, we internally converts Avro schema to Spark SQL StructType first, and then convert StructType to Parquet schema. Cheng On 5/19/15 4:42 PM, Ewan Leith wrote: Hi all, I might be missing something, but does the new Spark 1.3 sqlContext save interface support using Avro as the schema structure when writing Parquet files, in a similar way to AvroParquetWriter (which I've got working)? I've seen how you can load an avro file and save it as parquet from https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html, but not using the 2 together. Thanks, and apologies if I've missed something obvious! Ewan
Re: Spark SQL on large number of columns
Hi, I am using spark 1.3.1 Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote: And which version are you using 发自我的 iPhone 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Re: Spark SQL on large number of columns
And which version are you using 发自我的 iPhone 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.commailto:guha.a...@gmail.com 写道: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.commailto:phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
RE: Spark 1.3.1 Performance Tuning/Patterns for Data Generation Heavy/Throughput Jobs
Is that a Spark or Spark Streaming application Re the map transformation which is required you can also try flatMap Finally an Executor is essentially a JVM spawn by a Spark Worker Node or YARN – giving 60GB RAM to a single JVM will certainly result in “off the charts” GC. I would suggest to experiment with the following two things: 1. Give less RAM to each Executor but have more Executor including more than one Executor per Node especially if the ratio RAM to CPU Cores is favorable 2. Use Memory Serialized RDDs – this will store them still in RAM but in Java Object Serialized form and Spark uses Tachion for that purpose – a distributed In Memory File System – and it is Off the JVM Heap and hence avoids GC From: Night Wolf [mailto:nightwolf...@gmail.com] Sent: Tuesday, May 19, 2015 9:36 AM To: user@spark.apache.org Subject: Spark 1.3.1 Performance Tuning/Patterns for Data Generation Heavy/Throughput Jobs Hi all, I have a job that, for every row, creates about 20 new objects (i.e. RDD of 100 rows in = RDD 2000 rows out). The reason for this is each row is tagged with a list of the 'buckets' or 'windows' it belongs to. The actual data is about 10 billion rows. Each executor has 60GB of memory. Currently I have a mapPartitions task that is doing this object creation in a Scala Map and then returning the HashMap as an iterator via .toIterator. Is there a more efficient way to do this (assuming I can't use something like flatMap). The job runs (assuming each task size is small enough). But the GC time is understandably off the charts. I've reduced the spark cache memory percentage to 0.05 (as I just need space for a few broadcasts and this is a data churn task). I've left the shuffle memory percent unchanged. What kinds of settings should I be tuning with regards to GC? Looking at https://spark-summit.org/2014/wp-content/uploads/2015/03/SparkSummitEast2015-AdvDevOps-StudentSlides.pdf slide 125 recommends some settings but I'm not sure what would be best here). I tried using -XX:+UseG1GC but it pretty much causes my job to fail (all the executors die). Are there any tips with respect to the ratio of new gen and old gen space when creating lots of objects which will live in a data structure until the entire partition is processed? Any tips for tuning these kinds of jobs would be helpful! Thanks, ~N
Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?
Hi Ewan, Different from AvroParquetWriter, in Spark SQL we uses StructType as the intermediate schema format. So when converting Avro files to Parquet files, we internally converts Avro schema to Spark SQL StructType first, and then convert StructType to Parquet schema. Cheng On 5/19/15 4:42 PM, Ewan Leith wrote: Hi all, I might be missing something, but does the new Spark 1.3 sqlContext save interface support using Avro as the schema structure when writing Parquet files, in a similar way to AvroParquetWriter (which I’ve got working)? I've seen how you can load an avro file and save it as parquet fromhttps://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html, but not using the 2 together. Thanks, and apologies if I've missed something obvious! Ewan
Spark SQL on large number of columns
Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/
RE: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?
Thanks Cheng, that makes sense. So for new dataframe creation (not conversion from Avro but from JSON or CSV inputs) in Spark we shouldn't worry about using Avro at all, just use the Spark SQL StructType when building new Dataframes? If so, that will be a lot simpler! Thanks, Ewan From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: 19 May 2015 11:01 To: Ewan Leith; user@spark.apache.org Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces? Hi Ewan, Different from AvroParquetWriter, in Spark SQL we uses StructType as the intermediate schema format. So when converting Avro files to Parquet files, we internally converts Avro schema to Spark SQL StructType first, and then convert StructType to Parquet schema. Cheng On 5/19/15 4:42 PM, Ewan Leith wrote: Hi all, I might be missing something, but does the new Spark 1.3 sqlContext save interface support using Avro as the schema structure when writing Parquet files, in a similar way to AvroParquetWriter (which I've got working)? I've seen how you can load an avro file and save it as parquet from https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html, but not using the 2 together. Thanks, and apologies if I've missed something obvious! Ewan
Re: Spark SQL on large number of columns
Hi, I have fields from field_0 to fied_26000. The query is select on max( cast($columnName as double)), |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) for all those 26000 fields in one query. Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 3:59 PM, ayan guha guha.a...@gmail.com wrote: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Reading Binary files in Spark program
Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context:
Re: Spark SQL on large number of columns
can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Re: Spark SQL on large number of columns
Hi, An additional information is, table is backed by a csv file which is read using spark-csv from databricks. Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:05 PM, madhu phatak phatak@gmail.com wrote: Hi, I have fields from field_0 to fied_26000. The query is select on max( cast($columnName as double)), |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) for all those 26000 fields in one query. Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 3:59 PM, ayan guha guha.a...@gmail.com wrote: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Re: How to use spark to access HBase with Security enabled
Which user did you run your program as ? Have you granted proper permission on hbase side ? You should also check master log to see if there was some clue. Cheers On May 19, 2015, at 2:41 AM, donhoff_h 165612...@qq.com wrote: Hi, experts. I ran the HBaseTest program which is an example from the Apache Spark source code to learn how to use spark to access HBase. But I met the following exception: Exception in thread main org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions: Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0 I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in the above exception. I found a few entries like the following one: 2015-05-19 16:59:11,143 DEBUG [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: RpcServer.listener,port=16020: Caught exception while reading:Authentication is required The above entry did not point to my program clearly. But the time is very near. Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the exception was caused by the Kerberos authentication. But I am not sure. Do anybody know if my guess is right? And if I am right, could anybody tell me how to set Kerberos Authentication in a spark program? I don't know how to do it. I already checked the API doc , but did not found any API useful. Many Thanks! By the way, my spark version is 1.3.0. I also paste the code of HBaseTest in the following: ***Source Code** object HBaseTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HBaseTest) val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, args(0)) // Initialize hBase table if necessary val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(args(0))) { val tableDesc = new HTableDescriptor(args(0)) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.count() sc.stop() } }
Re: Spark Job not using all nodes in cluster
What is your spark env file says? Are you setting number of executors in spark context? On 20 May 2015 13:16, Shailesh Birari sbirar...@gmail.com wrote: Hi, I have a 4 node Spark 1.3.1 cluster. All four nodes have 4 cores and 64 GB of RAM. I have around 600,000+ Json files on HDFS. Each file is small around 1KB in size. Total data is around 16GB. Hadoop block size is 256MB. My application reads these files with sc.textFile() (or sc.jsonFile() tried both) API. But all the files are getting read by only one node (4 executors). Spark UI shows all 600K+ tasks on one node and 0 on other nodes. I confirmed that all files are accessible from all nodes. Some other application which uses big files uses all nodes on same cluster. Can you please let me know why it is behaving in such way ? Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-not-using-all-nodes-in-cluster-tp22951.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark logo license
Check out Apache's trademark guidelines here: http://www.apache.org/foundation/marks/ http://www.apache.org/foundation/marks/ Matei On May 20, 2015, at 12:02 AM, Justin Pihony justin.pih...@gmail.com wrote: What is the license on using the spark logo. Is it free to be used for displaying commercially? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark logo license
Thanks! On Wed, May 20, 2015 at 12:41 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Check out Apache's trademark guidelines here: http://www.apache.org/foundation/marks/ Matei On May 20, 2015, at 12:02 AM, Justin Pihony justin.pih...@gmail.com wrote: What is the license on using the spark logo. Is it free to be used for displaying commercially? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Hive on Spark VS Spark SQL
Hive on Spark and SparkSQL which should be better , and what are the key characteristics and the advantages and the disadvantages between ? guoqing0...@yahoo.com.hk
Re: Spark Streaming to Kafka
I think here is the PR https://github.com/apache/spark/pull/2994 you could refer to. 2015-05-20 13:41 GMT+08:00 twinkle sachdeva twinkle.sachd...@gmail.com: Hi, As Spark streaming is being nicely integrated with consuming messages from Kafka, so I thought of asking the forum, that is there any implementation available for pushing data to Kafka from Spark Streaming too? Any link(s) will be helpful. Thanks and Regards, Twinkle
Re: Reading Binary files in Spark program
Thanks. I will try and let you know. But what exactly is an issue? Any pointers? Regards Tapan On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456)
Spark Streaming to Kafka
Hi, As Spark streaming is being nicely integrated with consuming messages from Kafka, so I thought of asking the forum, that is there any implementation available for pushing data to Kafka from Spark Streaming too? Any link(s) will be helpful. Thanks and Regards, Twinkle
sparkSQL - Hive metastore connection hangs with MS SQL server
Hi all, I am trying to setup an external metastore using Microsoft SQL on Azure, it works ok initially but after about 5 mins inactivity it hangs, then times out after 15 mins with this error: 15/05/20 00:02:49 ERROR ConnectionHandle: Database access problem. Killing off this connection and all remaining connections in the connection pool. SQL State = 08S01 15/05/20 00:02:49 ERROR RetryingHMSHandler: Retrying HMSHandler after 1000 ms (attempt 1 of 1) with error: javax.jdo.JDODataStoreException: SQL Server did not return a response. The connection has been closed. at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:451) at org.datanucleus.api.jdo.JDOQuery.execute(JDOQuery.java:275) at org.apache.hadoop.hive.metastore.ObjectStore.getMTable(ObjectStore.java:901) at org.apache.hadoop.hive.metastore.ObjectStore.getTable(ObjectStore.java:833) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ... NestedThrowablesStackTrace: com.microsoft.sqlserver.jdbc.SQLServerException: SQL Server did not return a response. The connection has been closed. at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1668) at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1655) at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:4844) I have also tried replacing BoneCP with DBCP in datanucleus.connectionPoolingType, that didn't help either. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sparkSQL-Hive-metastore-connection-hangs-with-MS-SQL-server-tp22950.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark users
Hi I'm learning spark focused on data and machine learning. Migrating from SAS. There is a group for it? My questions are basic for now and I having very few answers. Tal Rick. Enviado do meu smartphone Samsung Galaxy. Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción. The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it. Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição
?????? How to use spark to access HBase with Security enabled
Sorry, this ref does not help me. I have set up the configuration in hbase-site.xml. But it seems there are still some extra configurations to be set or APIs to be called to make my spark program be able to pass the authentication with the HBase. Does anybody know how to set authentication to a secured HBase in a spark program which use the API newAPIHadoopRDD to get information from HBase? Many Thanks! -- -- ??: yuzhihong;yuzhih...@gmail.com; : 2015??5??19??(??) 9:54 ??: donhoff_h165612...@qq.com; : useruser@spark.apache.org; : Re: How to use spark to access HBase with Security enabled Please take a look at: http://hbase.apache.org/book.html#_client_side_configuration_for_secure_operation Cheers On Tue, May 19, 2015 at 5:23 AM, donhoff_h 165612...@qq.com wrote: The principal is sp...@bgdt.dev.hrb. It is the user that I used to run my spark programs. I am sure I have run the kinit command to make it take effect. And I also used the HBase Shell to verify that this user has the right to scan and put the tables in HBase. Now I still have no idea how to solve this problem. Can anybody help me to figure it out? Many Thanks! -- -- ??: yuzhihong;yuzhih...@gmail.com; : 2015??5??19??(??) 7:55 ??: donhoff_h165612...@qq.com; : useruser@spark.apache.org; : Re: How to use spark to access HBase with Security enabled Which user did you run your program as ? Have you granted proper permission on hbase side ? You should also check master log to see if there was some clue. Cheers On May 19, 2015, at 2:41 AM, donhoff_h 165612...@qq.com wrote: Hi, experts. I ran the HBaseTest program which is an example from the Apache Spark source code to learn how to use spark to access HBase. But I met the following exception: Exception in thread main org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions: Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0 I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in the above exception. I found a few entries like the following one: 2015-05-19 16:59:11,143 DEBUG [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: RpcServer.listener,port=16020: Caught exception while reading:Authentication is required The above entry did not point to my program clearly. But the time is very near. Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the exception was caused by the Kerberos authentication. But I am not sure. Do anybody know if my guess is right? And if I am right, could anybody tell me how to set Kerberos Authentication in a spark program? I don't know how to do it. I already checked the API doc , but did not found any API useful. Many Thanks! By the way, my spark version is 1.3.0. I also paste the code of HBaseTest in the following: ***Source Code** object HBaseTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HBaseTest) val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, args(0)) // Initialize hBase table if necessary val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(args(0))) { val tableDesc = new HTableDescriptor(args(0)) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.count() sc.stop() } }
RE: Spark sql error while writing Parquet file- Trying to write more fields than contained in row
Hi, Thanks for the response. I was looking for a java solution. I will check the scala and python ones. Regards, Anand.C From: Todd Nist [mailto:tsind...@gmail.com] Sent: Tuesday, May 19, 2015 6:17 PM To: Chandra Mohan, Ananda Vel Murugan Cc: ayan guha; user Subject: Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row I believe your looking for df.na.fill in scala, in pySpark Module it is fillna (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html) from the docs: df4.fillna({'age': 50, 'name': 'unknown'}).show() age height name 10 80 Alice 5 null Bob 50 null Tom 50 null unknown On Mon, May 18, 2015 at 11:01 PM, Chandra Mohan, Ananda Vel Murugan ananda.muru...@honeywell.commailto:ananda.muru...@honeywell.com wrote: Hi, Thanks for the response. But I could not see fillna function in DataFrame class. [cid:image001.png@01D092DA.4DF87A00] Is it available in some specific version of Spark sql. This is what I have in my pom.xml dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.1/version /dependency Regards, Anand.C From: ayan guha [mailto:guha.a...@gmail.commailto:guha.a...@gmail.com] Sent: Monday, May 18, 2015 5:19 PM To: Chandra Mohan, Ananda Vel Murugan; user Subject: Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row Hi Give a try with dtaFrame.fillna function to fill up missing column Best Ayan On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan ananda.muru...@honeywell.commailto:ananda.muru...@honeywell.com wrote: Hi, I am using spark-sql to read a CSV file and write it as parquet file. I am building the schema using the following code. String schemaString = a b c; ListStructField fields = new ArrayListStructField(); MetadataBuilder mb = new MetadataBuilder(); mb.putBoolean(nullable, true); Metadata m = mb.build(); for (String fieldName: schemaString.split( )) { fields.add(new StructField(fieldName,DataTypes.DoubleType,true, m)); } StructType schema = DataTypes.createStructType(fields); Some of the rows in my input csv does not contain three columns. After building my JavaRDDRow, I create data frame as shown below using the RDD and schema. DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema); Finally I try to save it as Parquet file darDataFrame.saveAsParquetFile(/home/anand/output.parquet”) I get this error when saving it as Parquet file java.lang.IndexOutOfBoundsException: Trying to write more fields than contained in row (3 2) I understand the reason behind this error. Some of my rows in Row RDD does not contain three elements as some rows in my input csv does not contain three columns. But while building the schema, I am specifying every field as nullable. So I believe, it should not throw this error. Can anyone help me fix this error. Thank you. Regards, Anand.C -- Best Regards, Ayan Guha
Spark Job not using all nodes in cluster
Hi, I have a 4 node Spark 1.3.1 cluster. All four nodes have 4 cores and 64 GB of RAM. I have around 600,000+ Json files on HDFS. Each file is small around 1KB in size. Total data is around 16GB. Hadoop block size is 256MB. My application reads these files with sc.textFile() (or sc.jsonFile() tried both) API. But all the files are getting read by only one node (4 executors). Spark UI shows all 600K+ tasks on one node and 0 on other nodes. I confirmed that all files are accessible from all nodes. Some other application which uses big files uses all nodes on same cluster. Can you please let me know why it is behaving in such way ? Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-not-using-all-nodes-in-cluster-tp22951.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading Binary files in Spark program
Problem is still there. Exception is not coming at the time of reading. Also the count of JavaPairRDD is as expected. It is when we are calling collect() or toArray() methods, the exception is coming. Something to do with Text class even though I haven't used it in the program. Regards Tapan On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at
Re: EOFException using KryoSerializer
Hi Jim, this is definitley strange. It sure sounds like a bug, but it also is a very commonly used code path, so it at the very least you must be hitting a corner case. Could you share a little more info with us? What version of spark are you using? How big is the object you are trying to broadcast? Can you share more of the logs from before the exception? It is not too surprising this shows up in mesos but not in local mode. Local mode never exercises the part of the code that needs to deserialize the blocks of a broadcast variables (though it actually does serialize the data into blocks). So I doubt its mesos specific, more likely it would happen in any cluster mode -- yarn, standalone, or even local-cluster (a pseudo-cluster just for testing). Imran On Tue, May 19, 2015 at 3:56 PM, Jim Carroll jimfcarr...@gmail.com wrote: I'm seeing the following exception ONLY when I run on a Mesos cluster. If I run the exact same code with master set to local[N] I have no problem: 2015-05-19 16:45:43,484 [task-result-getter-0] WARN TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, 10.253.1.101): java.io.EOFException at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) KryoSerializer explicitly throws an EOFException. The comment says: // DeserializationStream uses the EOF exception to indicate stopping condition. Apparently this isn't what TorrentBroadcast expects. Any suggestions? Thanks. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark 1.3.1 jars in repo1.maven.org
Hi, I'd like to confirm an observation I've just made. Specifically that spark is only available in repo1.maven.org for one Hadoop variant. The Spark source can be compiled against a number of different Hadoops using profiles. Yay. However, the spark jars in repo1.maven.org appear to be compiled against one specific Hadoop and no other differentiation is made. (I can see a difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in the version I compiled locally). The implication here is that if you have a pom file asking for spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2 version. Maven assumes that non-snapshot artifacts never change so trying to load an Hadoop 1 version will end in tears. This then means that if you compile code against spark-core then there will probably be classpath NoClassDefFound issues unless the Hadoop 2 version is exactly the one you want. Have I gotten this correct? It happens that our little app is using a Spark context directly from a Jetty webapp and the classpath differences were/are causing some confusion. We are currently installing a Hadoop 1 spark master and worker. Thanks a lot! Edward
Re: spark 1.3.1 jars in repo1.maven.org
I think your observation is correct. e.g. http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.3.1 shows that it depends on hadoop-client http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client from hadoop 2.2 Cheers On Tue, May 19, 2015 at 6:17 PM, Edward Sargisson esa...@pobox.com wrote: Hi, I'd like to confirm an observation I've just made. Specifically that spark is only available in repo1.maven.org for one Hadoop variant. The Spark source can be compiled against a number of different Hadoops using profiles. Yay. However, the spark jars in repo1.maven.org appear to be compiled against one specific Hadoop and no other differentiation is made. (I can see a difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in the version I compiled locally). The implication here is that if you have a pom file asking for spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2 version. Maven assumes that non-snapshot artifacts never change so trying to load an Hadoop 1 version will end in tears. This then means that if you compile code against spark-core then there will probably be classpath NoClassDefFound issues unless the Hadoop 2 version is exactly the one you want. Have I gotten this correct? It happens that our little app is using a Spark context directly from a Jetty webapp and the classpath differences were/are causing some confusion. We are currently installing a Hadoop 1 spark master and worker. Thanks a lot! Edward
Spark logo license
What is the license on using the spark logo. Is it free to be used for displaying commercially? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Find KNN in Spark SQL
The batch version of this is part of rowSimilarities JIRA 4823 ...if your query points can fit in memory there is broadcast version which we are experimenting with internallywe are using brute force KNN right now in the PR...based on flann paper lsh did not work well but before you go to approximate knn you have to make sure your topk precision/recall is not degrading as compared to brute force in your cv flow... I have not yet extracted knn model but that will use the IndexedRowMatrix changes that we put in the PR On May 19, 2015 12:58 PM, Xiangrui Meng men...@gmail.com wrote: Spark SQL doesn't provide spatial features. Large-scale KNN is usually combined with locality-sensitive hashing (LSH). This Spark package may be helpful: http://spark-packages.org/package/mrsqueeze/spark-hash. -Xiangrui On Sat, May 9, 2015 at 9:25 PM, Dong Li lid...@lidong.net.cn wrote: Hello experts, I’m new to Spark, and want to find K nearest neighbors on huge scale high-dimension points dataset in very short time. The scenario is: the dataset contains more than 10 million points, whose dimension is 200d. I’m building a web service, to receive one new point at each request and return K nearest points inside that dataset, also need to ensure the time-cost not very high. I have a cluster with several high-memory nodes for this service. Currently I only have these ideas here: 1. To create several ball-tree instances in each node when service initializing. This is fast, but not perform well at data scaling ability. I cannot insert new nodes to the ball-trees unless I restart the services and rebuild them. 2. To use sql based solution. Some database like PostgreSQL and SqlServer have features on spatial search. But these database may not perform well in big data environment. (Does SparkSQL have Spatial features or spatial index?) Based on your experience, can I achieve this scenario in Spark SQL? Or do you know other projects in Spark stack acting well for this? Any ideas are appreciated, thanks very much. Regards, Dong - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL on large number of columns
Hi, Tested for calculating values for 300 columns. Analyser takes around 4 minutes to generate the plan. Is this normal? Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com wrote: Hi, I am using spark 1.3.1 Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote: And which version are you using 发自我的 iPhone 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row
I believe your looking for df.na.fill in scala, in pySpark Module it is fillna (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html) from the docs: df4.fillna({'age': 50, 'name': 'unknown'}).show()age height name10 80 Alice5 null Bob50 null Tom50 null unknown On Mon, May 18, 2015 at 11:01 PM, Chandra Mohan, Ananda Vel Murugan ananda.muru...@honeywell.com wrote: Hi, Thanks for the response. But I could not see fillna function in DataFrame class. Is it available in some specific version of Spark sql. This is what I have in my pom.xml dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.1/version /dependency Regards, Anand.C *From:* ayan guha [mailto:guha.a...@gmail.com] *Sent:* Monday, May 18, 2015 5:19 PM *To:* Chandra Mohan, Ananda Vel Murugan; user *Subject:* Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row Hi Give a try with dtaFrame.fillna function to fill up missing column Best Ayan On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan ananda.muru...@honeywell.com wrote: Hi, I am using spark-sql to read a CSV file and write it as parquet file. I am building the schema using the following code. String schemaString = a b c; ListStructField fields = *new* ArrayListStructField(); MetadataBuilder mb = *new* MetadataBuilder(); mb.putBoolean(nullable, *true*); Metadata m = mb.build(); *for* (String fieldName: schemaString.split( )) { fields.add(*new* StructField(fieldName,DataTypes. *DoubleType*,*true*, m)); } StructType schema = DataTypes.*createStructType*(fields); Some of the rows in my input csv does not contain three columns. After building my JavaRDDRow, I create data frame as shown below using the RDD and schema. DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema); Finally I try to save it as Parquet file darDataFrame.saveAsParquetFile(/home/anand/output.parquet”) I get this error when saving it as Parquet file java.lang.IndexOutOfBoundsException: Trying to write more fields than contained in row (3 2) I understand the reason behind this error. Some of my rows in Row RDD does not contain three elements as some rows in my input csv does not contain three columns. But while building the schema, I am specifying every field as nullable. So I believe, it should not throw this error. Can anyone help me fix this error. Thank you. Regards, Anand.C -- Best Regards, Ayan Guha
Re: Broadcast variables can be rebroadcast?
hmm, I guess it depends on the way you look at it. In a way, I'm saying that spark does *not* have any built in auto-re-broadcast if you try to mutate a broadcast variable. Instead, you should create something new, and just broadcast it separately. Then just have all the code you have operating on your RDDs look at the new broadcast variable. But I guess there is another way to look at it -- you are creating new broadcast variables each time, but they all point to the same underlying mutable data structure. So in a way, you are rebroadcasting the same underlying data structure. Let me expand my example from earlier a little bit more: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } // this is a val, because the data structure itself is mutable val myMutableDataStructue = ... // this is a var, because you will create new broadcasts var myBroadcast = sc.broadcast(myMutableDataStructure) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) // update your mutable data structure in place myMutableDataStructure.update(...) // ... but that doesn't effect the broadcast variables living out on the cluster, so we need to // create a new one // this line is not required -- the broadcast var will automatically get unpersisted when a gc // cleans up the old broadcast on the driver, but I'm including this here for completeness, // in case you want to more proactively clean up old blocks if you are low on space myBroadcast.unpersist() // now we create a new broadcast which has the updated data in our mutable data structure myBroadcast = sc.broadcast(myMutableDataStructure) } hope this clarifies things! Imran On Tue, May 19, 2015 at 3:06 AM, N B nb.nos...@gmail.com wrote: Hi Imran, If I understood you correctly, you are suggesting to simply call broadcast again from the driver program. This is exactly what I am hoping will work as I have the Broadcast data wrapped up and I am indeed (re)broadcasting the wrapper over again when the underlying data changes. However, documentation seems to suggest that one cannot re-broadcast. Is my understanding accurate? Thanks NB On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com wrote: Rather than updating the broadcast variable, can't you simply create a new one? When the old one can be gc'ed in your program, it will also get gc'ed from spark's cache (and all executors). I think this will make your code *slightly* more complicated, as you need to add in another layer of indirection for which broadcast variable to use, but not too bad. Eg., from var myBroadcast = sc.broadcast( ...) (0 to 20).foreach{ iteration = // ... some rdd operations that involve myBroadcast ... myBroadcast.update(...) // wrong! dont' update a broadcast variable } instead do something like: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } var myBroadcast = sc.broadcast(...) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with whatever you need to update it } On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote: Thanks Ayan. Can we rebroadcast after updating in the driver? Thanks NB. On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote: Hi broadcast variables are shipped for the first time it is accessed in a transformation to the executors used by the transformation. It will NOT updated subsequently, even if the value has changed. However, a new value will be shipped to any new executor comes into play after the value has changed. This way, changing value of broadcast variable is not a good idea as it can create inconsistency within cluster. From documentatins: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote: Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: The broadcast variable is like a pointer. If the underlying data changes then the changes will be visible throughout the cluster. On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote: Hello, Once a broadcast variable is created using sparkContext.broadcast(), can it ever be updated again? The use case is for something like the underlying lookup data changing over time. Thanks NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe,
?????? How to use spark to access HBase with Security enabled
The principal is sp...@bgdt.dev.hrb. It is the user that I used to run my spark programs. I am sure I have run the kinit command to make it take effect. And I also used the HBase Shell to verify that this user has the right to scan and put the tables in HBase. Now I still have no idea how to solve this problem. Can anybody help me to figure it out? Many Thanks! -- -- ??: yuzhihong;yuzhih...@gmail.com; : 2015??5??19??(??) 7:55 ??: donhoff_h165612...@qq.com; : useruser@spark.apache.org; : Re: How to use spark to access HBase with Security enabled Which user did you run your program as ? Have you granted proper permission on hbase side ? You should also check master log to see if there was some clue. Cheers On May 19, 2015, at 2:41 AM, donhoff_h 165612...@qq.com wrote: Hi, experts. I ran the HBaseTest program which is an example from the Apache Spark source code to learn how to use spark to access HBase. But I met the following exception: Exception in thread main org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions: Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0 I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in the above exception. I found a few entries like the following one: 2015-05-19 16:59:11,143 DEBUG [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: RpcServer.listener,port=16020: Caught exception while reading:Authentication is required The above entry did not point to my program clearly. But the time is very near. Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the exception was caused by the Kerberos authentication. But I am not sure. Do anybody know if my guess is right? And if I am right, could anybody tell me how to set Kerberos Authentication in a spark program? I don't know how to do it. I already checked the API doc , but did not found any API useful. Many Thanks! By the way, my spark version is 1.3.0. I also paste the code of HBaseTest in the following: ***Source Code** object HBaseTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HBaseTest) val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, args(0)) // Initialize hBase table if necessary val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(args(0))) { val tableDesc = new HTableDescriptor(args(0)) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.count() sc.stop() } }
RE: Decision tree: categorical variables
Hi , can you pls share how you resolved the parsing issue. It would be of great help... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-categorical-variables-tp12433p22943.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL on large number of columns
Hi, Another update, when run on more that 1000 columns I am getting Could not write class __wrapper$1$40255d281a0d4eacab06bcad6cf89b0d/__wrapper$1$40255d281a0d4eacab06bcad6cf89b0d$$anonfun$wrapper$1$$anon$1 because it exceeds JVM code size limits. Method apply's code too large! Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 6:23 PM, madhu phatak phatak@gmail.com wrote: Hi, Tested with HiveContext also. It also take similar amount of time. To make the things clear, the following is select clause for a given column *aggregateStats( $columnName , max( cast($columnName as double)), |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) )* aggregateStats is UDF generating case class to hold the values. Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 5:57 PM, madhu phatak phatak@gmail.com wrote: Hi, Tested for calculating values for 300 columns. Analyser takes around 4 minutes to generate the plan. Is this normal? Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com wrote: Hi, I am using spark 1.3.1 Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote: And which version are you using 发自我的 iPhone 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Re: Reading Real Time Data only from Kafka
Cool. Thanks for the detailed response Cody. Thanks Best Regards On Tue, May 19, 2015 at 6:43 PM, Cody Koeninger c...@koeninger.org wrote: If those questions aren't answered by https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md please let me know so I can update it. If you set auto.offset.reset to largest, it will start at the largest offset. Any messages before that will be skipped, so if prior runs of the job didn't consume them, they're lost. KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from a locality hint if you have kafka running on the same node as spark), and it doesn't have any long-running receivers. Executors get whatever partitions the normal scheduler decides they should get. If an executor fails, a different executor reads the offset range for the failed partition; they're immutable, so no difference in result. Deciding where to save offsets (or not) is up to you. You can checkpoint, or store them yourself. On Mon, May 18, 2015 at 12:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I have played a bit with the directStream kafka api. Good work cody. These are my findings and also can you clarify a few things for me (see below). - When auto.offset.reset- smallest and you have 60GB of messages in Kafka, it takes forever as it reads the whole 60GB at once. largest will only read the latest messages. - To avoid this, you can actually limit the rate with spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always reads the same amount of data). - Number of partitions per batch = number of kafka partitions. - In the case of driver failures, offset reset being set to smallest will replay the whole messages and largest will only read those messages which are pushed after the streaming job has started. What happens to those messages which arrive in between? *Few things which are unclear:* - If we have a kafka topic with 9 partitions, and spark cluster with 3 slaves, how does it decides which slave should read from which partition? And what happens if a single slave fails while reading the data? - By default it doesn't push the offsets of messages which are read anywhere, then how does it replay the message in case of failures? Thanks Best Regards On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org wrote: You linked to a google mail tab, not a public archive, so I don't know exactly which conversation you're referring to. As far as I know, streaming only runs a single job at a time in the order they were defined, unless you turn on an experimental option for more parallelism (TD or someone more knowledgeable can chime in on this). If you're talking about the possibility of the next job starting before the prior one has fully finished, because your processing is lagging behind... I'm not 100% sure this is possible because I've never observed it. The thing is, it's a moot point, because if you're saving offsets yourself transactionally, you already need to be verifying that offsets are correct (increasing without gaps) in order to handle restarts correctly. If you're super concerned about how batches get generated, the direct api gives you access to KafkaUtils.createRDD... just schedule your own rdds in the order you want. Again, flexible. On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a
Re: Spark and Flink
I guess it's a typo: eu.stratosphere should be replaced by org.apache.flink On Tue, May 19, 2015 at 1:13 PM, Alexander Alexandrov alexander.s.alexand...@gmail.com wrote: We managed to do this with the following config: // properties !-- Hadoop -- hadoop.version2.2.0/hadoop.version !-- Flink -- flink.version0.9-SNAPSHOT/flink.version !-- Spark -- spark.version1.2.1/spark.version // form the dependency management !-- Hadoop -- dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-common/artifactId version${hadoop.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-hdfs/artifactId version${hadoop.version}/version scopeprovided/scope /dependency !-- Flink -- dependency groupIdeu.stratosphere/groupId artifactIdflink-scala/artifactId version${flink.version}/version scopeprovided/scope /dependency dependency groupIdeu.stratosphere/groupId artifactIdflink-java/artifactId version${flink.version}/version scopeprovided/scope /dependency dependency groupIdeu.stratosphere/groupId artifactIdflink-clients/artifactId version${flink.version}/version scopeprovided/scope /dependency !-- Spark -- dependency groupIdorg.apache.spark/groupId artifactIdspark-core_${scala.tools.version}/artifactId version${spark.version}/version scopeprovided/scope /dependency !-- Jetty -- dependency groupIdorg.eclipse.jetty/groupId artifactIdjetty-util/artifactId version${jetty.version}/version /dependency dependency groupIdorg.eclipse.jetty/groupId artifactIdjetty-servlet/artifactId version${jetty.version}/version /dependency // actual dependencies !-- Spark -- dependency groupIdorg.apache.spark/groupId artifactIdspark-core_${scala.tools.version}/artifactId /dependency !-- Flink -- dependency groupIdeu.stratosphere/groupId artifactIdflink-scala/artifactId /dependency dependency groupIdeu.stratosphere/groupId artifactIdflink-java/artifactId /dependency dependency groupIdeu.stratosphere/groupId artifactIdflink-clients/artifactId /dependency !-- FIXME: this is a hacky solution for a Flink issue with the Jackson deps-- dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-core/artifactId version2.2.1/version scopeprovided/scope /dependency dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-databind/artifactId version2.2.1/version scopeprovided/scope /dependency dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-annotations/artifactId version2.2.1/version scopeprovided/scope /dependency 2015-05-19 10:06 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com: it's sound good, maybe you can send me pseudo structure, that is my fist maven project. best regards, paul 2015-05-18 14:05 GMT+02:00 Robert Metzger rmetz...@apache.org: Hi, I would really recommend you to put your Flink and Spark dependencies into different maven modules. Having them both in the same project will be very hard, if not impossible. Both projects depend on similar projects with slightly different versions. I would suggest a maven module structure like this: yourproject-parent (a pom module) -- yourproject-common -- yourproject-flink -- yourproject-spark On Mon, May 18, 2015 at 10:00 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hi, if i add your dependency i get over 100 errors, now i change the version number: dependencies dependency groupIdcom.fasterxml.jackson.module/groupId artifactIdjackson-module-scala_2.10/artifactId version2.4.4/version exclusions exclusion groupIdcom.google.guava/groupId artifactIdguava/artifactId /exclusion
Hive in IntelliJ
I was trying to implement this example: http://spark.apache.org/docs/1.3.1/sql-programming-guide.html#hive-tables It worked well when I built spark in terminal using command specified: http://spark.apache.org/docs/1.3.1/building-spark.html#building-with-hive-and-jdbc-support But when I try to implement in IntelliJ, following the specifications specified: https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-IntelliJ It throws the error: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) console:21: error: object hive is not a member of package org.apache.spark.sql Can any one help me get through this issue. Regards Akhil
Re: Spark SQL on large number of columns
Hi, Tested with HiveContext also. It also take similar amount of time. To make the things clear, the following is select clause for a given column *aggregateStats( $columnName , max( cast($columnName as double)), |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) )* aggregateStats is UDF generating case class to hold the values. Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 5:57 PM, madhu phatak phatak@gmail.com wrote: Hi, Tested for calculating values for 300 columns. Analyser takes around 4 minutes to generate the plan. Is this normal? Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com wrote: Hi, I am using spark 1.3.1 Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote: And which version are you using 发自我的 iPhone 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Re: Reading Real Time Data only from Kafka
If those questions aren't answered by https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md please let me know so I can update it. If you set auto.offset.reset to largest, it will start at the largest offset. Any messages before that will be skipped, so if prior runs of the job didn't consume them, they're lost. KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from a locality hint if you have kafka running on the same node as spark), and it doesn't have any long-running receivers. Executors get whatever partitions the normal scheduler decides they should get. If an executor fails, a different executor reads the offset range for the failed partition; they're immutable, so no difference in result. Deciding where to save offsets (or not) is up to you. You can checkpoint, or store them yourself. On Mon, May 18, 2015 at 12:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I have played a bit with the directStream kafka api. Good work cody. These are my findings and also can you clarify a few things for me (see below). - When auto.offset.reset- smallest and you have 60GB of messages in Kafka, it takes forever as it reads the whole 60GB at once. largest will only read the latest messages. - To avoid this, you can actually limit the rate with spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always reads the same amount of data). - Number of partitions per batch = number of kafka partitions. - In the case of driver failures, offset reset being set to smallest will replay the whole messages and largest will only read those messages which are pushed after the streaming job has started. What happens to those messages which arrive in between? *Few things which are unclear:* - If we have a kafka topic with 9 partitions, and spark cluster with 3 slaves, how does it decides which slave should read from which partition? And what happens if a single slave fails while reading the data? - By default it doesn't push the offsets of messages which are read anywhere, then how does it replay the message in case of failures? Thanks Best Regards On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org wrote: You linked to a google mail tab, not a public archive, so I don't know exactly which conversation you're referring to. As far as I know, streaming only runs a single job at a time in the order they were defined, unless you turn on an experimental option for more parallelism (TD or someone more knowledgeable can chime in on this). If you're talking about the possibility of the next job starting before the prior one has fully finished, because your processing is lagging behind... I'm not 100% sure this is possible because I've never observed it. The thing is, it's a moot point, because if you're saving offsets yourself transactionally, you already need to be verifying that offsets are correct (increasing without gaps) in order to handle restarts correctly. If you're super concerned about how batches get generated, the direct api gives you access to KafkaUtils.createRDD... just schedule your own rdds in the order you want. Again, flexible. On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing
Re: spark streaming doubt
So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or multiple ?
Re: PySpark Job throwing IOError
Hello all, I have an error in pyspark for which I have not the faintest idea of the cause. All I can tell from the stack trace is that it can't find a pyspark file on the path /mnt/spark-*/pyspark-*. Apart from that I need someone more experienced than me with Spark to look into it and help diagnose the problem and suggest potential solutions, hence I am looking to this group for help. If anyone wants to read the same question on Stack Overflow here is the link: http://stackoverflow.com/questions/30328104/pyspark-job-throwing-ioerror Here's the same thing pasted as raw text: I am trying to write a simple KNN job using pyspark on a hdfs cluster. I am using very few input files to perform the job so I don't think it's a memory (space). I do not do a broadcast in any part of my code. So it is surprising to me when the broadcast.py fails? I however do have python dictionaries that I have in shared memory without explicitly doing a broadcast. Can anyone help me understand what is going on? I have appended my python file and the stack trace to this email. Thanks, Nikhil from pyspark.mllib.linalg import SparseVector from pyspark import SparkContext import glob import sys import time import subprocess from itertools import combinations We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts. def create_indices(inputdir): items=dict() user_id_to_idx=dict() user_idx_to_id=dict() item_idx_to_id=dict() item_id_to_idx=dict() item_idx=0 user_idx=0 cat=subprocess.Popen([hadoop,fs,-cat,/user/hadoop/+inputdir+/*.txt],stdout=subprocess.PIPE) for line in cat.stdout: toks=map(str,line.strip().split(\t)) try: user_id_to_idx[toks[1].strip()] except KeyError: if toks[1].strip()!=None: user_id_to_idx[toks[1].strip()]=user_idx user_idx_to_id[user_idx]=toks[1].strip() user_idx+=1 try: item_id_to_idx[toks[0].strip()] except KeyError: if toks[0].strip()!=None: item_id_to_idx[toks[0].strip()]=item_idx item_idx_to_id[item_idx]=toks[0].strip() item_idx+=1 return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx def concat_helper(a,b): if(a!= None and b!=None): print a,b,a.update(b) temp=dict() temp.update(a) temp.update(b) return temp elif a!=None: return a elif b!=None: return b # pass in the hdfs path to the input files and the spark context. def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx): rdd_text=sc.textFile(inputdir) try: new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split(\t)[0])],{user_id_to_idx[str(x.strip().split(\t)[1])]:1})).reduceByKey(lambda x,y: concat_helper(x,y)).sortByKey() except KeyError: print item_id_to_idx.keys() pass return new_rdd if __name__==__main__: sc = SparkContext() u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1]) u_idx_to_id_b=sc.broadcast(u_idx_to_id) u_id_to_idx_b=sc.broadcast(u_id_to_idx) i_idx_to_idx_b=sc.broadcast(i_idx_to_id) i_id_to_idx_b=sc.broadcast(i_id_to_idx) num_users=sc.broadcast(u_idx) num_items=sc.broadcast(i_idx) item_dict_rdd=runKNN(sys.argv[1],sc,u_id_to_idx,i_id_to_idx) item_dict_rdd_new=item_dict_rdd.map(lambda x: (x[0],SparseVector(i_idx,x[1]))) item_dict_rdd_new.saveAsTextFile(hdfs://output_path) #dot_products_rdd=map(lambda (x,y): (x,y),combinations(item_dict_rdd_new.map(lambda x: x),2)) dot_products_rdd.saveAsTextFile(hdfs://output_path_2) stacktrace Description: stacktrace - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming doubt
On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or multiple ?
Multi user setup and saving a DataFrame / RDD to a network exported file system
Dear Experts, we have a spark cluster (standalone mode) in which master and workers are started from root account. Everything runs correctly to the point when we try doing operations such as dataFrame.select(name, age).save(ofile, parquet) or rdd.saveAsPickleFile(ofile) , where ofile is path on a network exported filesystem (visible on all nodes, in our case this is lustre, I guess on nfs effect would be similar). Unsurprisingly temp files created on workers are owned by root, which then leads to a crash (see [1] below). Is there a solution/workaround for this (e.g. controlling file creation mode of the temporary files)? Cheers, Tomasz ps I've tried to google this problem, couple of similar reports, but no clear answer/solution found ps2 For completeness - running master/workers as a regular user solves the problem only for the given user. For other users submitting to this master the result is given in [2] below [0] Cluster details: Master/workers: centos 6.5 Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build) [1] ## File /mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o27.save. : java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet; isDirectory=false; length=534; replication=1; blocksize=33554432; modification_time=1432042832000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43) at org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:690) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1181) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) ## [2] ## 15/05/19 14:45:19 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, wn23023.cis.gov.pl): java.io.IOException: Mkdirs failed to create file:/mnt/lustre/bigdata/med_home/tmp/test18/namesAndAges.parquet2/_temporary/0/_temporary/attempt_201505191445_0009_r_00_0 at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784) at parquet.hadoop.ParquetFileWriter.init(ParquetFileWriter.java:154) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:279) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:667) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at
Re: Wish for 1.4: upper bound on # tasks in Mesos
Hey Tom, Are you using the fine-grained or coarse-grained scheduler? For the coarse-grained scheduler, there is a spark.cores.max config setting that will limit the total # of cores it grabs. This was there in earlier versions too. Matei On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com wrote: I read the other day that there will be a fair number of improvements in 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a configurable limit for the number of tasks for jobs run on Mesos ? This would be a very simple yet effective way to prevent a job dominating the cluster. cheers, Tom - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PanTera Big Data Visualization built with Spark
Hi, Can you please add us to the list of Spark Users Org: PanTera URL: http://pantera.io Components we are using: - PanTera uses a direct access to the Spark Scala API - Spark Core SparkContext, JavaSparkContext, SparkConf, RDD, JavaRDD, - Accumulable, AccumulableParam, Accumulator, AccumulatorParam, - StorageLevel, Broadcast, HashPartitioner, Logging, KryoRegistrator, - NewHadoopRDD, UnionRDD - We also use SharedSparkContext for testing, but we've made a copy - SparkSQL SQLContext, JavaSQLContext, ByteType, IntType, LongType, - FloatType, DoubleType, BooleanType, StringType, ArrayType, - TimestampType, StructField, StructType, Row, GenericRow - Spark Streaming We have done so in the past, and will again in the future, - but not currently supported (when used: StreamingContext, DStream, Time) - GraphX Graph, PartitionStrategy, Edge, EdgeTriplet, VertexRDD, - graphToGraphOps Use Case: PanTera is a tool for exploring large datasets. It uses Spark to create XY and geographic scatterplots from millions to billions of datapoints. Please let me know if you have any questions. Thanks very much, Cyrus Handy PanTera http://pantera.io Product Manager Uncharted™ http://uncharted.software (Formerly Oculus Info Inc.) Direct: 416-203-3003 x232 Mobile: 416-821-3025
Re: Wish for 1.4: upper bound on # tasks in Mesos
I'm using fine-grained for a multi-tenant environment which is why I would welcome the limit of tasks per job :) cheers, Tom On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Tom, Are you using the fine-grained or coarse-grained scheduler? For the coarse-grained scheduler, there is a spark.cores.max config setting that will limit the total # of cores it grabs. This was there in earlier versions too. Matei On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com wrote: I read the other day that there will be a fair number of improvements in 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a configurable limit for the number of tasks for jobs run on Mesos ? This would be a very simple yet effective way to prevent a job dominating the cluster. cheers, Tom
Wish for 1.4: upper bound on # tasks in Mesos
I read the other day that there will be a fair number of improvements in 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a configurable limit for the number of tasks for jobs run on Mesos ? This would be a very simple yet effective way to prevent a job dominating the cluster. cheers, Tom
Code error
Hi, Can anybody see what's wrong in this piece of code: ./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg.Vectors val data = sc.textFile(/user/p_loadbd/fraude5.csv).map(x = x.toLowerCase.split(',')).map(x = x(0)+,+x(1)) val header = data.first() val filter_data = data.filter(x = x != header) val parsedData = data.map(s = Vectors.dense(s.split(',').map(_.toDouble))).cache() val numClusters = 2 val numIterations = 20 val clusters = KMeans.train(parsedData, numClusters, numIterations) val WSSSE = clusters.computeCost(parsedData) println(Within Set Sum of Squared Errors = + WSSSE) Thanks. [Descrição: Descrição: Descrição: cid:image002.jpg@01CC89A8.2B628650] Ricardo Goncalves da Silva Lead Data Scientist | Seção de Desenvolvimento de Sistemas de Business Intelligence - Projetos de Inovação | IDPB02 Av. Eng. Luis Carlos Berrini, 1.376 - 7º - 04571-000 - SP ricardog.si...@telefonica.commailto:ricardog.si...@telefonica.com | www.telefonica.com.brhttp://www.telefonica.com.br/ Tel +55 11 3430 4955 | Cel +55 11 94292 9526 Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción. The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it. Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição
Re: spark streaming doubt
Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or
Re: Decision tree: categorical variables
Hi Keerthi As Xiangrui mentioned in the reply, the categorical variables are assumed to be encoded as integers between 0 and k - 1, if k is the parameter you are passing as the category info map. So you will need to handle this during parsing (your columns 3 and 6 need to be converted into ints in the right range) Ram On Tue, May 19, 2015 at 5:45 AM, Keerthi keerthi.reddy1...@gmail.com wrote: Hi , can you pls share how you resolved the parsing issue. It would be of great help... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-categorical-variables-tp12433p22943.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast variables can be rebroadcast?
Thanks Imran. It does help clarify. I believe I had it right all along then but was confused by documentation talking about never changing the broadcasted variables. I've tried it on a local mode process till now and does seem to work as intended. When (and if !) we start running on a real cluster, I hope this holds up. Thanks NB On Tue, May 19, 2015 at 6:25 AM, Imran Rashid iras...@cloudera.com wrote: hmm, I guess it depends on the way you look at it. In a way, I'm saying that spark does *not* have any built in auto-re-broadcast if you try to mutate a broadcast variable. Instead, you should create something new, and just broadcast it separately. Then just have all the code you have operating on your RDDs look at the new broadcast variable. But I guess there is another way to look at it -- you are creating new broadcast variables each time, but they all point to the same underlying mutable data structure. So in a way, you are rebroadcasting the same underlying data structure. Let me expand my example from earlier a little bit more: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } // this is a val, because the data structure itself is mutable val myMutableDataStructue = ... // this is a var, because you will create new broadcasts var myBroadcast = sc.broadcast(myMutableDataStructure) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) // update your mutable data structure in place myMutableDataStructure.update(...) // ... but that doesn't effect the broadcast variables living out on the cluster, so we need to // create a new one // this line is not required -- the broadcast var will automatically get unpersisted when a gc // cleans up the old broadcast on the driver, but I'm including this here for completeness, // in case you want to more proactively clean up old blocks if you are low on space myBroadcast.unpersist() // now we create a new broadcast which has the updated data in our mutable data structure myBroadcast = sc.broadcast(myMutableDataStructure) } hope this clarifies things! Imran On Tue, May 19, 2015 at 3:06 AM, N B nb.nos...@gmail.com wrote: Hi Imran, If I understood you correctly, you are suggesting to simply call broadcast again from the driver program. This is exactly what I am hoping will work as I have the Broadcast data wrapped up and I am indeed (re)broadcasting the wrapper over again when the underlying data changes. However, documentation seems to suggest that one cannot re-broadcast. Is my understanding accurate? Thanks NB On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com wrote: Rather than updating the broadcast variable, can't you simply create a new one? When the old one can be gc'ed in your program, it will also get gc'ed from spark's cache (and all executors). I think this will make your code *slightly* more complicated, as you need to add in another layer of indirection for which broadcast variable to use, but not too bad. Eg., from var myBroadcast = sc.broadcast( ...) (0 to 20).foreach{ iteration = // ... some rdd operations that involve myBroadcast ... myBroadcast.update(...) // wrong! dont' update a broadcast variable } instead do something like: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } var myBroadcast = sc.broadcast(...) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with whatever you need to update it } On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote: Thanks Ayan. Can we rebroadcast after updating in the driver? Thanks NB. On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote: Hi broadcast variables are shipped for the first time it is accessed in a transformation to the executors used by the transformation. It will NOT updated subsequently, even if the value has changed. However, a new value will be shipped to any new executor comes into play after the value has changed. This way, changing value of broadcast variable is not a good idea as it can create inconsistency within cluster. From documentatins: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote: Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: The broadcast variable is like a pointer. If the underlying data changes then the changes will be visible throughout the cluster. On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote: Hello, Once a broadcast variable is created using
Re: Reading Binary files in Spark program
Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at
Re: group by and distinct performance issue
You may want to look at this tooling for helping identify performance issues and bottlenecks: https://github.com/kayousterhout/trace-analysis I believe this is slated to become part of the web ui in the 1.4 release, in fact based on the status of the JIRA, https://issues.apache.org/jira/browse/SPARK-6418, looks like it is complete. On Tue, May 19, 2015 at 3:56 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Peer, If you open the driver UI (running on port 4040) you can see the stages and the tasks happening inside it. Best way to identify the bottleneck for a stage is to see if there's any time spending on GC, and how many tasks are there per stage (it should be a number total # cores to achieve max parallelism). Also you can see for each task how long does it take etc into consideration. Thanks Best Regards On Tue, May 19, 2015 at 12:58 PM, Peer, Oded oded.p...@rsa.com wrote: I am running Spark over Cassandra to process a single table. My task reads a single days’ worth of data from the table and performs 50 group by and distinct operations, counting distinct userIds by different grouping keys. My code looks like this: JavaRddRow rdd = sc.parallelize().mapPartitions().cache() // reads the data from the table for each groupingKey { JavaPairRddGroupingKey, UserId groupByRdd = rdd.mapToPair(); JavaPairRDDGroupingKey, Long countRdd = groupByRdd.distinct().mapToPair().reduceByKey() // counts distinct values per grouping key } The distinct() stage takes about 2 minutes for every groupByValue, and my task takes well over an hour to complete. My cluster has 4 nodes and 30 GB of RAM per Spark process, the table size is 4 GB. How can I identify the bottleneck more accurately? Is it caused by shuffling data? How can I improve the performance? Thanks, Oded
Re: How to use spark to access HBase with Security enabled
Please take a look at: http://hbase.apache.org/book.html#_client_side_configuration_for_secure_operation Cheers On Tue, May 19, 2015 at 5:23 AM, donhoff_h 165612...@qq.com wrote: The principal is sp...@bgdt.dev.hrb. It is the user that I used to run my spark programs. I am sure I have run the kinit command to make it take effect. And I also used the HBase Shell to verify that this user has the right to scan and put the tables in HBase. Now I still have no idea how to solve this problem. Can anybody help me to figure it out? Many Thanks! -- 原始邮件 -- *发件人:* yuzhihong;yuzhih...@gmail.com; *发送时间:* 2015年5月19日(星期二) 晚上7:55 *收件人:* donhoff_h165612...@qq.com; *抄送:* useruser@spark.apache.org; *主题:* Re: How to use spark to access HBase with Security enabled Which user did you run your program as ? Have you granted proper permission on hbase side ? You should also check master log to see if there was some clue. Cheers On May 19, 2015, at 2:41 AM, donhoff_h 165612...@qq.com wrote: Hi, experts. I ran the HBaseTest program which is an example from the Apache Spark source code to learn how to use spark to access HBase. But I met the following exception: Exception in thread main org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions: Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0 I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in the above exception. I found a few entries like the following one: 2015-05-19 16:59:11,143 DEBUG [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: RpcServer.listener,port=16020: Caught exception while reading:Authentication is required The above entry did not point to my program clearly. But the time is very near. Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the exception was caused by the Kerberos authentication. But I am not sure. Do anybody know if my guess is right? And if I am right, could anybody tell me how to set Kerberos Authentication in a spark program? I don't know how to do it. I already checked the API doc , but did not found any API useful. Many Thanks! By the way, my spark version is 1.3.0. I also paste the code of HBaseTest in the following: ***Source Code** object HBaseTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HBaseTest) val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, args(0)) // Initialize hBase table if necessary val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(args(0))) { val tableDesc = new HTableDescriptor(args(0)) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.count() sc.stop() } }
Re: Wish for 1.4: upper bound on # tasks in Mesos
Yeah, this definitely seems useful there. There might also be some ways to cap the application in Mesos, but I'm not sure. Matei On May 19, 2015, at 1:11 PM, Thomas Dudziak tom...@gmail.com wrote: I'm using fine-grained for a multi-tenant environment which is why I would welcome the limit of tasks per job :) cheers, Tom On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com wrote: Hey Tom, Are you using the fine-grained or coarse-grained scheduler? For the coarse-grained scheduler, there is a spark.cores.max config setting that will limit the total # of cores it grabs. This was there in earlier versions too. Matei On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com mailto:tom...@gmail.com wrote: I read the other day that there will be a fair number of improvements in 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a configurable limit for the number of tasks for jobs run on Mesos ? This would be a very simple yet effective way to prevent a job dominating the cluster. cheers, Tom
Re: spark streaming doubt
Thanks Akhil andDibyendu. Does in high level receiver based streaming executors run on receivers itself to have data localisation ? Or its always data is transferred to executor nodes and executor nodes differ in each run of job but receiver node remains same(same machines) throughout life of streaming application unless node failure happens? On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at
Re: Getting the best parameter set back from CrossValidatorModel
Hi Justin Ram, To clarify, PipelineModel.stages is not private[ml]; only the PipelineModel constructor is private[ml]. So it's safe to use pipelineModel.stages as a Spark user. Ram's example looks good. Btw, in Spark 1.4 (and the current master build), we've made a number of improvements to Params and Pipelines, so this should become easier to use! Joseph On Sun, May 17, 2015 at 10:17 PM, Justin Yip yipjus...@prediction.io wrote: Thanks Ram. Your sample look is very helpful. (there is a minor bug that PipelineModel.stages is hidden under private[ml], just need a wrapper around it. :) Justin On Sat, May 16, 2015 at 10:44 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi Justin The CrossValidatorExample here https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala is a good example of how to set up an ML Pipeline for extracting a model with the best parameter set. You set up the pipeline as in here: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala#L73 This pipeline is treated as an estimator and wrapped into a Cross Validator to do grid search and return the model with the best parameters . Once you have trained the best model as in here https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala#L93 The result is a CrossValidatorModel which contains the best estimator (i.e. the best pipeline above) and you can extract the best pipeline and inquire its parameters as follows: // what are the best parameters? val bestPipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel] val stages = bestPipelineModel.stages val hashingStage = stages(1).asInstanceOf[HashingTF] println(hashingStage.getNumFeatures) val lrStage = stages(2).asInstanceOf[LogisticRegressionModel] println(lrStage.getRegParam) Ram On Sat, May 16, 2015 at 3:17 AM, Justin Yip yipjus...@prediction.io wrote: Hello, I am using MLPipeline. I would like to extract the best parameter found by CrossValidator. But I cannot find much document about how to do it. Can anyone give me some pointers? Thanks. Justin -- View this message in context: Getting the best parameter set back from CrossValidatorModel http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-best-parameter-set-back-from-CrossValidatorModel-tp22915.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: How to implement an Evaluator for a ML pipeline?
The documentation needs to be updated to state that higher metric values are better (https://issues.apache.org/jira/browse/SPARK-7740). I don't know why if you negate the return value of the Evaluator you still get the highest regularization parameter candidate. Maybe you should check the log messages from CrossValidator and see the average metric values during cross validation. -Xiangrui On Sat, May 9, 2015 at 12:15 PM, Stefan H. twel...@gmx.de wrote: Hello everyone, I am stuck with the (experimental, I think) API for machine learning pipelines. I have a pipeline with just one estimator (ALS) and I want it to try different values for the regularization parameter. Therefore I need to supply an Evaluator that returns a value of type Double. I guess this could be something like accuracy or mean squared error? The only implementation I found is BinaryClassificationEvaluator, and I did not understand the computation there. I could not find detailed documentation so I implemented a dummy Evaluator that just returns the regularization parameter: new Evaluator { def evaluate(dataset: DataFrame, paramMap: ParamMap): Double = paramMap.get(als.regParam).getOrElse(throw new Exception) } I just wanted to see whether the lower or higher value wins. On the resulting model I inspected the chosen regularization parameter this way: cvModel.bestModel.fittingParamMap.get(als.regParam) And it was the highest of my three regularization parameter candidates. Strange thing is, if I negate the return value of the Evaluator, that line still returns the highest regularization parameter candidate. So I am probably working with false assumptions. I'd be grateful if someone could point me to some documentation or examples, or has a few hints to share. Cheers, Stefan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-an-Evaluator-for-a-ML-pipeline-tp22830.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Add to Powered by Spark page
Hi, We would like to be added to the Powered by Spark list: organization name: Localytics URL: http://eng.localytics.com/ a list of which Spark components you are using: Spark, Spark Streaming, MLLib a short description of your use case: Batch, real-time, and predictive analytics driving our mobile app analytics and marketing automation product. thanks, M
Re: rdd.sample() methods very slow
The way these files are accessed is inherently sequential-access. There isn't a way to in general know where record N is in a file like this and jump to it. So they must be read to be sampled. On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Hi I have an RDD[Document] that contains 7 million objects and it is saved in file system as object file. I want to get a random sample of about 70 objects from it using rdd.sample() method. It is ver slow val rdd : RDD[Document] = sc.objectFile[Document](C:/temp/docs.obj).sample(false, 0.1D, 0L).cache() val count = rdd.count() From Spark UI, I see spark is try to read the entire object files at the folder “C:/temp/docs.obj” which is about 29.7 GB. Of course this is very slow. Why does Spark try to read entire 7 million objects while I only need to return a random sample of 70 objects? Is there any efficient way to get a random sample of 70 objects without reading through the entire object files? Ningjun
Exception when using CLUSTER BY or ORDER BY
Under certain circumstances that I haven't yet been able to isolate, I get the following error when doing a HQL query using HiveContext (Spark 1.3.1 on Mesos, fine-grained mode). Is this a known problem or should I file a JIRA for it ? org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:746) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:56) at org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:259) at org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:257) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Re: Naming an DF aggregated column
customerDF.groupBy(state).agg(max($discount).alias(newName)) (or .as(...), both functions can take a String or a Symbol) On Tue, May 19, 2015 at 2:11 PM, Cesar Flores ces...@gmail.com wrote: I would like to ask if there is a way of specifying the column name of a data frame aggregation. For example If I do: customerDF.groupBy(state).agg(max($discount)) the name of my aggregated column will be: MAX('discount) Is there a way of changing the name of that column to something else on the fly, and not after performing the aggregation? thanks -- Cesar Flores
Re: Word2Vec with billion-word corpora
With vocabulary size 4M and 400 vector size, you need 400 * 4M = 16B floats to store the model. That is 64GB. We store the model on the driver node in the current implementation. So I don't think it would work. You might try increasing the minCount to decrease the vocabulary size and reduce the vector size. I'm interested in learning the trade-off between the model size and the model quality. If you have done some experiments, please let me know. Thanks! -Xiangrui On Wed, May 13, 2015 at 11:17 AM, Shilad Sen s...@macalester.edu wrote: Hi all, I'm experimenting with Spark's Word2Vec implementation for a relatively large (5B word, vocabulary size 4M, 400-dimensional vectors) corpora. Has anybody had success running it at this scale? Thanks in advance for your guidance! -Shilad -- Shilad W. Sen Associate Professor Mathematics, Statistics, and Computer Science Dept. Macalester College s...@macalester.edu http://www.shilad.com https://www.linkedin.com/in/shilad 651-696-6273 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Hive 1.0 support in Spark
Does Spark 1.3.1 support Hive 1.0? If not, which version of Spark will start supporting Hive 1.0? -- Kannan
Re: Discretization
Thanks for asking! We should improve the documentation. The sample dataset is actually mimicking the MNIST digits dataset, where the values are gray levels (0-255). So by dividing by 16, we want to map it to 16 coarse bins for the gray levels. Actually, there is a bug in the doc, we should convert the values to integer first before dividing by 16. I created https://issues.apache.org/jira/browse/SPARK-7739 for this issue. Welcome to submit a patch:) Thanks! Best, Xiangrui On Thu, May 7, 2015 at 9:20 PM, spark_user_2015 li...@adobe.com wrote: The Spark documentation shows the following example code: // Discretize data in 16 equal bins since ChiSqSelector requires categorical features val discretizedData = data.map { lp = LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x = x / 16 } ) ) } I'm sort of missing why x / 16 is considered a discretization approach here. [https://spark.apache.org/docs/latest/mllib-feature-extraction.html#feature-selection] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Discretization-tp22811.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Naming an DF aggregated column
I would like to ask if there is a way of specifying the column name of a data frame aggregation. For example If I do: customerDF.groupBy(state).agg(max($discount)) the name of my aggregated column will be: MAX('discount) Is there a way of changing the name of that column to something else on the fly, and not after performing the aggregation? thanks -- Cesar Flores
Re: Does Python 2.7 have to be installed on every cluster node?
PySpark work with CPython by default, and you can specify which version of Python to use by: PYSPARK_PYTHON=path/to/path bin/spark-submit xxx.py When you do the upgrade, you could install python 2.7 on every machine in the cluster, test it with PYSPARK_PYTHON=python2.7 bin/spark-submit xxx.py For YARN, you also need to install python2.7 in every node in the cluster. On Tue, May 19, 2015 at 7:44 AM, YaoPau jonrgr...@gmail.com wrote: We're running Python 2.6.6 here but we're looking to upgrade to 2.7.x in a month. Does pyspark work by converting Python into Java Bytecode, or does it run Python natively? And along those lines, if we're running in yarn-client mode, would we have to upgrade just the edge node version of Python, or every node in the cluster? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Python-2-7-have-to-be-installed-on-every-cluster-node-tp22945.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stratified sampling with DataFrames
You need to convert DataFrame to RDD, call sampleByKey, and then apply the schema back to create DataFrame. val df: DataFrame = ... val schema = df.schema val sampledRDD = df.rdd.keyBy(r = r.getAs[Int](0)).sampleByKey(...).values val sampled = sqlContext.createDataFrame(sampledRDD, schema) Hopefully this would be much easier in 1.5. Best, Xiangrui On Mon, May 11, 2015 at 12:32 PM, Karthikeyan Muthukumar mkarthiksw...@gmail.com wrote: Hi, I'm in Spark 1.3.0 and my data is in DataFrames. I need operations like sampleByKey(), sampleByKeyExact(). I saw the JIRA Add approximate stratified sampling to DataFrame (https://issues.apache.org/jira/browse/SPARK-7157). That's targeted for Spark 1.5, till that comes through, whats the easiest way to accomplish the equivalent of sampleByKey() and sampleByKeyExact() on DataFrames. Thanks Regards MK - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming + Kafka failure recovery
If you checkpoint, the job will start from the successfully consumed offsets. If you don't checkpoint, by default it will start from the highest available offset, and you will potentially lose data. Is the link I posted, or for that matter the scaladoc, really not clear on that point? The scaladoc says: To recover from driver failures, you have to enable checkpointing in the StreamingContext http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/StreamingContext.html. The information on consumed offset can be recovered from the checkpoint. On Tue, May 19, 2015 at 2:38 PM, Bill Jay bill.jaypeter...@gmail.com wrote: If a Spark streaming job stops at 12:01 and I resume the job at 12:02. Will it still start to consume the data that were produced to Kafka at 12:01? Or it will just start consuming from the current time? On Tue, May 19, 2015 at 10:58 AM, Cody Koeninger c...@koeninger.org wrote: Have you read https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ? 1. There's nothing preventing that. 2. Checkpointing will give you at-least-once semantics, provided you have sufficient kafka retention. Be aware that checkpoints aren't recoverable if you upgrade code. On Tue, May 19, 2015 at 12:42 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently using Spark streaming to consume and save logs every hour in our production pipeline. The current setting is to run a crontab job to check every minute whether the job is still there and if not resubmit a Spark streaming job. I am currently using the direct approach for Kafka consumer. I have two questions: 1. In the direct approach, no offset is stored in zookeeper and no group id is specified. Can two consumers (one is Spark streaming and the other is a Kafak console consumer in Kafka package) read from the same topic from the brokers together (I would like both of them to get all messages, i.e. publish-subscribe mode)? What about two Spark streaming jobs read from the same topic? 2. How to avoid data loss if a Spark job is killed? Does checkpointing serve this purpose? The default behavior of Spark streaming is to read the latest logs. However, if a job is killed, can the new job resume from what was left to avoid loosing logs? Thanks! Bill
How to set the file size for parquet Part
Hi I'm using spark 1.3.1 and now I can't set the size of the part generated file for parquet. The size is only 512Kb it's really to small I must made them bigger. How can set this ? Thanks
Re: RandomSplit with Spark-ML and Dataframe
Thank you ! Le mar. 19 mai 2015 à 21:08, Xiangrui Meng men...@gmail.com a écrit : In 1.4, we added RAND as a DataFrame expression, which can be used for random split. Please check the example here: https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214. https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.-Xiangrui -Xiangrui https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.-Xiangrui On Thu, May 7, 2015 at 8:39 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, is there any best practice to do like in MLLib a randomSplit of training/cross-validation set with dataframes and the pipeline API ? Regards Olivier.
Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar
Hey Jaonary, I saw this line in the error message: org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) CaseClassStringParser is only used in older versions of Spark to parse schema from JSON. So I suspect that the cluster was running on a old version of Spark when you use spark-submit to run your assembly jar. Best, Xiangrui On Mon, May 11, 2015 at 7:40 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: In this example, every thing work expect save to parquet file. On Mon, May 11, 2015 at 4:39 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: MyDenseVectorUDT do exist in the assembly jar and in this example all the code is in a single file to make sure every thing is included. On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng men...@gmail.com wrote: You should check where MyDenseVectorUDT is defined and whether it was on the classpath (or in the assembly jar) at runtime. Make sure the full class name (with package name) is used. Btw, UDTs are not public yet, so please use it with caution. -Xiangrui On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an example of code to reproduce the issue I mentioned in a previous mail about saving an UserDefinedType into a parquet file. The problem here is that the code works when I run it inside intellij idea but fails when I create the assembly jar and run it with spark-submit. I use the master version of Spark. @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT]) class MyDenseVector(val data: Array[Double]) extends Serializable { override def equals(other: Any): Boolean = other match { case v: MyDenseVector = java.util.Arrays.equals(this.data, v.data) case _ = false } } class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { override def sqlType: DataType = ArrayType(DoubleType, containsNull = false) override def serialize(obj: Any): Seq[Double] = { obj match { case features: MyDenseVector = features.data.toSeq } } override def deserialize(datum: Any): MyDenseVector = { datum match { case data: Seq[_] = new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray) } } override def userClass: Class[MyDenseVector] = classOf[MyDenseVector] } case class Toto(imageAnnotation: MyDenseVector) object TestUserDefinedType { case class Params(input: String = null, partitions: Int = 12, outputDir: String = images.parquet) def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(ImportImageFolder).setMaster(local[4]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rawImages = sc.parallelize((1 to 5).map(x = Toto(new MyDenseVector(Array[Double](x.toDouble).toDF rawImages.printSchema() rawImages.show() rawImages.save(toto.parquet) // This fails with assembly jar sc.stop() } } My build.sbt is as follow : libraryDependencies ++= Seq( org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-sql % sparkVersion, org.apache.spark %% spark-mllib % sparkVersion ) assemblyMergeStrategy in assembly := { case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, jboss, xs @ _*) = MergeStrategy.first // case PathList(ps @ _*) if ps.last endsWith .html = MergeStrategy.first // case application.conf= MergeStrategy.concat case m if m.startsWith(META-INF) = MergeStrategy.discard //case x = // val oldStrategy = (assemblyMergeStrategy in assembly).value // oldStrategy(x) case _ = MergeStrategy.first } As I said, this code works without problem when I execute it inside intellij idea. But when generate the assembly jar with sbt-assembly and use spark-submit I got the following error : 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7) java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]} ^ at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)
Re: Increase maximum amount of columns for covariance matrix for principal components
We use a dense array to store the covariance matrix on the driver node. So its length is limited by the integer range, which is 65536 * 65536 (actually half). -Xiangrui On Wed, May 13, 2015 at 1:57 AM, Sebastian Alfers sebastian.alf...@googlemail.com wrote: Hello, in order to compute a huge dataset, the amount of columns to calculate the covariance matrix is limited: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L129 What is the reason behind this limitation and can it be extended? Greetings Sebastian - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: k-means core function for temporal geo data
I'm not sure whether k-means would converge with this customized distance measure. You can list (weighted) time as a feature along with coordinates, and then use Euclidean distance. For other supported distance measures, you can check Derrick's package: http://spark-packages.org/package/derrickburns/generalized-kmeans-clustering. -Xiangrui On Mon, May 18, 2015 at 2:30 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hallo, i want cluster geo data (lat,long,timestamp) with k-means. now i search for a good core function, i can not find good paper or other sources for that. to time i multiplicate the time and the space distance: public static double dis(GeoData input1, GeoData input2) { double timeDis = Math.abs( input1.getTime() - input2.getTime() ); double geoDis = geoDis(input1, input2); //extra function return timeDis*geoDis; } maybe someone know a good core function for clustering temporal geo data? (need citation) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark mllib kmeans
Just curious, what distance measure do you need? -Xiangrui On Mon, May 11, 2015 at 8:28 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: take a look at this https://github.com/derrickburns/generalized-kmeans-clustering Best, Jao On Mon, May 11, 2015 at 3:55 PM, Driesprong, Fokko fo...@driesprong.frl wrote: Hi Paul, I would say that it should be possible, but you'll need a different distance measure which conforms to your coordinate system. 2015-05-11 14:59 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com: hi, it is possible to use a custom distance measure and a other data typ as vector? i want cluster temporal geo datas. best regards paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org