Re: error in running StructuredStreaming-Kafka integration code (Spark 2.x & Kafka 10)
Karen, It looks like the Kafka version is incorrect. You mention Kafka 0.10 however the classpath references Kafka 0.9 Thanks, David On July 10, 2017 at 1:44:06 PM, karan alang (karan.al...@gmail.com) wrote: Hi All, I'm running Spark Streaming - Kafka integration using Spark 2.x & Kafka 10. & seems to be running into issues. I compiled the program using sbt, and the compilation went through fine. I was able able to import this into Eclipse & run the program from Eclipse. However, when i run the program using spark-submit, i'm getting the following error : -- > $SPARK_HOME/bin/spark-submit --class > "structuredStreaming.kafka.StructuredKafkaWordCount1" > --master local[2] /Users/karanalang/Documents/Te > chnology/Coursera_spark_scala/structuredStreamingKafka/targe > t/scala-2.11/StructuredStreamingKafka-assembly-1.0.jar > *java.lang.ClassNotFoundException: > structuredStreaming.kafka.StructuredKafkaWordCount1* > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at org.apache.spark.util.Utils$.classForName(Utils.scala:229) > at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy > $SparkSubmit$$runMain(SparkSubmit.scala:695) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > -- I've put the jar in the classpath, but i still get the error -> echo $CLASSPATH .:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0. > 9.0.1/lib/jopt-simple-3.2.jar:/Users/karanalang/Documents/Te > chnology/kafka/kafka_2.11-0.9.0.1/lib/kafka-clients-0.9.0.1. > jar:/Users/karanalang/Documents/Technology/kafka/kafka_2.11- > 0.9.0.1/lib/kafka_2.11-0.9.0.1.jar:/Users/karanalang/ > Documents/Technology/kafka/kafka_2.11-0.9.0.1/lib/log4j- > 1.2.17.jar:/Users/karanalang/Documents/Technology/kafka/ > kafka_2.11-0.9.0.1/lib/metrics-core-2.2.0.jar:/Users/karanalang/Documents/ > Technology/kafka/kafka_2.11-0.9.0.1/lib/scala-library-2.11. > 7.jar:/Users/karanalang/Documents/Technology/kafka/ > kafka_2.11-0.9.0.1/lib/slf4j-api-1.7.6.jar:/Users/ > karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/ > lib/slf4j-log4j12-1.7.6.jar:/Users/karanalang/Documents/ > Technology/kafka/kafka_2.11-0.9.0.1/lib/snappy-java-1.1.1.7. > jar:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/lib/ > zkclient-0.7.jar:/Users/karanalang/Documents/Technolog > y/kafka/kafka_2.11-0.9.0.1/lib/zookeeper-3.4.6.jar:/ > Users/karanalang/Documents/Technology/ApacheSpark-v2.1/spark > -2.1.0-bin-hadoop2.7/jars/*.jar:/Users/karanalang/Document > s/Technology/kafka/mirrormaker_topic_rename-master/target/ > mmchangetopic-1.0-SNAPSHOT.jar:/Users/karanalang/Documents/Technology/ > *Coursera_spark_scala/structuredStreamingKafka/target/scala-2.11/* > *StructuredStreamingKafka-assembly-1.0.jar* When i look inside the jar - *StructuredStreamingKafka-assembly-1.0.jar, i don't see the file "*StructuredKafkaWordCount1.class" Attaching my build.sbt. Any ideas on what i need to do ? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RE: HBase-Spark Module
Hi Ben, This seems more like a question for community.cloudera.com. However, it would be in hbase not spark I believe. https://repository.cloudera.com/artifactory/webapp/#/artifacts/browse/tree/General/cloudera-release-repo/org/apache/hbase/hbase-spark David Newberger -Original Message- From: Benjamin Kim [mailto:bbuil...@gmail.com] Sent: Friday, July 29, 2016 12:57 PM To: user@spark.apache.org Subject: HBase-Spark Module I would like to know if anyone has tried using the hbase-spark module? I tried to follow the examples in conjunction with CDH 5.8.0. I cannot find the HBaseTableCatalog class in the module or in any of the Spark jars. Can someone help? Thanks, Ben - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RE: difference between dataframe and dataframwrite
DataFrame is a collection of data which is organized into named columns. DataFrame.write is an interface for saving the contents of a DataFrame to external storage. Hope this helps David Newberger From: pseudo oduesp [mailto:pseudo20...@gmail.com] Sent: Thursday, June 16, 2016 9:43 AM To: user@spark.apache.org Subject: difference between dataframe and dataframwrite hi, what is difference between dataframe and dataframwrite ?
RE: streaming example has error
Try adding wordCounts.print() before ssc.start() David Newberger From: Lee Ho Yeung [mailto:jobmatt...@gmail.com] Sent: Wednesday, June 15, 2016 9:16 PM To: David Newberger Cc: user@spark.apache.org Subject: Re: streaming example has error got another error StreamingContext: Error starting the context, marking it as stopped /home/martin/Downloads/spark-1.6.1/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0 import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount").set("spark.driver.allowMultipleContexts", "true") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", ) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) ssc.start() ssc.awaitTermination() scala> val pairs = words.map(word => (word, 1)) pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@61a5e7<mailto:org.apache.spark.streaming.dstream.MappedDStream@61a5e7> scala> val wordCounts = pairs.reduceByKey(_ + _) wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@a522f1<mailto:org.apache.spark.streaming.dstream.ShuffledDStream@a522f1> scala> ssc.start() 16/06/15 19:14:10 ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute at scala.Predef$.require(Predef.scala:233) at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:44) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:46) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:48) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:50) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:52) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:60) at $line42.$read$$iwC$$iwC$$iwC$$iwC.(:62) at $line42.$read$$iwC$$iwC$$iwC.(:64) at $line42.$read$$iwC$$iwC.(:66) at $line42.$read$$iwC.(:68) at $line42.$read.(:70) at $line42.$read$.(:74) at $line42.$read$.() at $line42.$eval$.(:7) at $line42.$eval$.() at $line42.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org<http://org.apache.spark.repl.SparkILoop.org>$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$proces
RE: Limit pyspark.daemon threads
Have you tried setting spark.cores.max “When running on a standalone deploy cluster<http://spark.apache.org/docs/latest/spark-standalone.html> or a Mesos cluster in "coarse-grained" sharing mode<http://spark.apache.org/docs/latest/running-on-mesos.html#mesos-run-modes>, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will bespark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos.” David Newberger From: agateaaa [mailto:agate...@gmail.com] Sent: Wednesday, June 15, 2016 4:39 PM To: Gene Pang Cc: Sven Krasser; Carlile, Ken; user Subject: Re: Limit pyspark.daemon threads Thx Gene! But my concern is with CPU usage not memory. I want to see if there is anyway to control the number of pyspark.daemon processes that get spawned. We have some restriction on number of CPU's we can use on a node, and number of pyspark.daemon processes that get created dont seem to honor spark.executor.cores property setting Thanks! On Wed, Jun 15, 2016 at 1:53 PM, Gene Pang mailto:gene.p...@gmail.com>> wrote: As Sven mentioned, you can use Alluxio to store RDDs in off-heap memory, and you can then share that RDD across different jobs. If you would like to run Spark on Alluxio, this documentation can help: http://www.alluxio.org/documentation/master/en/Running-Spark-on-Alluxio.html Thanks, Gene On Tue, Jun 14, 2016 at 12:44 AM, agateaaa mailto:agate...@gmail.com>> wrote: Hi, I am seeing this issue too with pyspark (Using Spark 1.6.1). I have set spark.executor.cores to 1, but I see that whenever streaming batch starts processing data, see python -m pyspark.daemon processes increase gradually to about 5, (increasing CPU% on a box about 4-5 times, each pyspark.daemon takes up around 100 % CPU) After the processing is done 4 pyspark.daemon processes go away and we are left with one till the next batch run. Also sometimes the CPU usage for executor process spikes to about 800% even though spark.executor.core is set to 1 e.g. top output PID USER PR NI VIRT RES SHR S %CPU %MEMTIME+ COMMAND 19634 spark 20 0 8871420 1.790g 32056 S 814.1 2.9 0:39.33 /usr/lib/j+ <--EXECUTOR 13897 spark 20 0 46576 17916 6720 S 100.0 0.0 0:00.17 python -m + <--pyspark.daemon 13991 spark 20 0 46524 15572 4124 S 98.0 0.0 0:08.18 python -m + <--pyspark.daemon 14488 spark 20 0 46524 15636 4188 S 98.0 0.0 0:07.25 python -m + <--pyspark.daemon 14514 spark 20 0 46524 15636 4188 S 94.0 0.0 0:06.72 python -m + <--pyspark.daemon 14526 spark 20 0 48200 17172 4092 S 0.0 0.0 0:00.38 python -m + <--pyspark.daemon Is there any way to control the number of pyspark.daemon processes that get spawned ? Thank you Agateaaa On Sun, Mar 27, 2016 at 1:08 AM, Sven Krasser mailto:kras...@gmail.com>> wrote: Hey Ken, 1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap storage option using Alluxio, formerly Tachyon, with which I have no experience however.) 2. The worker memory setting is not a hard maximum unfortunately. What happens is that during aggregation the Python daemon will check its process size. If the size is larger than this setting, it will start spilling to disk. I've seen many occasions where my daemons grew larger. Also, you're relying on Python's memory management to free up space again once objects are evicted. In practice, leave this setting reasonably small but make sure there's enough free memory on the machine so you don't run into OOM conditions. If the lower memory setting causes strains for your users, make sure they increase the parallelism of their jobs (smaller partitions meaning less data is processed at a time). 3. I believe that is the behavior you can expect when setting spark.executor.cores. I've not experimented much with it and haven't looked at that part of the code, but what you describe also reflects my understanding. Please share your findings here, I'm sure those will be very helpful to others, too. One more suggestion for your users is to move to the Pyspark DataFrame API. Much of the processing will then happen in the JVM, and you will bump into fewer Python resource contention issues. Best, -Sven On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken mailto:carli...@janelia.hhmi.org>> wrote: This is extremely helpful! I’ll have to talk to my users about how the python memory limit should be adjusted and what their expectations are. I’m fairly certain we bumped it up in the dark past when jobs were failing because of insufficient memory for the python processes. So just to make sure I’m understanding correctly: * JVM memory (set by SPARK_EXECUTOR_MEMORY and/or SPARK_WORKER_MEMORY?) is wher
RE: Handle empty kafka in Spark Streaming
Hi Yogesh, I'm not sure if this is possible or not. I'd be interested in knowing. My gut thinks it would be an anti-pattern if it's possible to do something like this and that's why I handle it in either the foreachRDD or foreachPartition. The way I look at spark streaming is as an application which is always running and doing something like windowed batching or microbatching or whatever I'm trying to accomplish. IF an RDD I get from Kafka is empty then I don't run the rest of the job. IF the RDD I'm get from Kafka has some number of events then I'll process the RDD further. David Newberger -Original Message- From: Yogesh Vyas [mailto:informy...@gmail.com] Sent: Wednesday, June 15, 2016 8:30 AM To: David Newberger Subject: Re: Handle empty kafka in Spark Streaming I am looking for something which checks the JavaPairReceiverInputDStreambefore further going for any operations. For example, if I have get JavaPairReceiverInputDStream in following manner: JavaPairReceiverInputDStream message=KafkaUtils.createStream(ssc, zkQuorum, group, topics, StorageLevel.MEMORY_AND_DISK_SER()); Then I would like check whether message is empty or not. If it not empty then go for further operations else wait for some data in Kafka. On Wed, Jun 15, 2016 at 6:31 PM, David Newberger wrote: > If you're asking how to handle no messages in a batch window then I would add > an isEmpty check like: > > dStream.foreachRDD(rdd => { > if (!rdd.isEmpty()) > ... > } > > Or something like that. > > > David Newberger > > -Original Message- > From: Yogesh Vyas [mailto:informy...@gmail.com] > Sent: Wednesday, June 15, 2016 6:31 AM > To: user > Subject: Handle empty kafka in Spark Streaming > > Hi, > > Does anyone knows how to handle empty Kafka while Spark Streaming job is > running ? > > Regards, > Yogesh > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For > additional commands, e-mail: user-h...@spark.apache.org >
RE: Handle empty kafka in Spark Streaming
If you're asking how to handle no messages in a batch window then I would add an isEmpty check like: dStream.foreachRDD(rdd => { if (!rdd.isEmpty()) ... } Or something like that. David Newberger -Original Message- From: Yogesh Vyas [mailto:informy...@gmail.com] Sent: Wednesday, June 15, 2016 6:31 AM To: user Subject: Handle empty kafka in Spark Streaming Hi, Does anyone knows how to handle empty Kafka while Spark Streaming job is running ? Regards, Yogesh - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: streaming example has error
Have you tried to “set spark.driver.allowMultipleContexts = true”? David Newberger From: Lee Ho Yeung [mailto:jobmatt...@gmail.com] Sent: Tuesday, June 14, 2016 8:34 PM To: user@spark.apache.org Subject: streaming example has error when simulate streaming with nc -lk got error below, then i try example, martin@ubuntu:~/Downloads$ /home/martin/Downloads/spark-1.6.1/bin/run-example streaming.NetworkWordCount localhost Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/06/14 18:33:06 INFO StreamingExamples: Setting log level to [WARN] for streaming example. To override add a custom log4j.properties to the classpath. 16/06/14 18:33:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/06/14 18:33:06 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.157.134 instead (on interface eth0) 16/06/14 18:33:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 16/06/14 18:33:13 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes got error too. import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", ) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) ssc.start() ssc.awaitTermination() scala> import org.apache.spark._ import org.apache.spark._ scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._ scala> import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.StreamingContext._ scala> val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@67bcaf<mailto:org.apache.spark.SparkConf@67bcaf> scala> val ssc = new StreamingContext(conf, Seconds(1)) 16/06/14 18:28:44 WARN AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040<http://SelectChannelConnector@0.0.0.0:4040>: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:433) at sun.nio.ch.Net.bind(Net.java:425) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:252) at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262) at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1988) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1979) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:262) at org.apache.spark.ui.WebUI.bind(WebUI.scala:136) at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481) at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext.(SparkContext.scala:481) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874) at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:81) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
RE: Creating a Hive table through Spark and potential locking issue (a bug)
Could you be looking at 2 jobs trying to use the same file and one getting to it before the other and finally removing it? David Newberger From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com] Sent: Wednesday, June 8, 2016 1:33 PM To: user; user @spark Subject: Creating a Hive table through Spark and potential locking issue (a bug) Hi, I noticed an issue with Spark creating and populating a Hive table. The process as I see is as follows: 1. Spark creates the Hive table. In this case an ORC table in a Hive Database 2. Spark uses JDBC connection to get data out from an Oracle 3. I create a temp table in Spark through (registerTempTable) 4. Spark populates that table. That table is actually created in hdfs dfs -ls /tmp/hive/hduser drwx-- - hduser supergroup /tmp/hive/hduser/b1ea6829-790f-4b37-a0ff-3ed218388059 1. However, The original table itself does not have any locking on it! 2. I log in into Hive and drop that table 3. hive> drop table dummy; OK 1. That table is dropped OK 2. Spark crashes with message Started at [08/06/2016 18:37:53.53] 16/06/08 19:13:46 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/hive/warehouse/oraclehadoop.db/dummy/.hive-staging_hive_2016-06-08_18-38-08_804_3299712811201460314-1/-ext-1/_temporary/0/_temporary/attempt_201606081838_0001_m_00_0/part-0 (inode 831621): File does not exist. Holder DFSClient_NONMAPREDUCE_-1836386597_1 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy22.addBlock(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy23.addBlock(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1532) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1349) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588) 16/06/08 19:13:46 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job Suggested solution. In a concurrent env, Spark should apply locks in order to prevent such operations. Locks are kept in Hive meta data table HIVE_LOCKS HTH Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>
RE: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.
Hi Mich, My gut says you are correct that each application should have its own checkpoint directory. Though honestly I’m a bit fuzzy on checkpointing still as I’ve not worked with it much yet. Cheers, David Newberger From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com] Sent: Friday, June 3, 2016 3:40 PM To: David Newberger Cc: user @spark Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. Hi David yes they do The first streaming job does val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") And the twitter does /** Returns the HDFS URL */ def getCheckpointDirectory(): String = { try { val name : String = Seq("bash", "-c", "curl -s http://169.254.169.254/latest/meta-data/hostname";) !! ; println("Hostname = " + name) "hdfs://" + name.trim + ":9000/checkpoint/" } catch { case e: Exception => { "./checkpoint/" } } I need to change one of these. Actually a better alternative would be that each application has its own checkpoint? THanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/> On 3 June 2016 at 21:23, David Newberger mailto:david.newber...@wandcorp.com>> wrote: I was going to ask if you had 2 jobs running. If the checkpointing for both are setup to look at the same location I could see an error like this happening. Do both spark jobs have a reference to a checkpointing dir? David Newberger From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>] Sent: Friday, June 3, 2016 3:20 PM To: user @spark Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. OK I was running two spark streaming jobs, one using streaming data from Kafka and another from twitter in local mode on the same node. It is possible that the directory /user/hduser/checkpoint/temp is shared by both spark streaming jobs any experience on this please? Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/> On 3 June 2016 at 20:48, Mich Talebzadeh mailto:mich.talebza...@gmail.com>> wrote: Hi, Just started seeing these errors: 16/06/03 20:30:01 ERROR DFSClient: Failed to close inode 806125 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-907736468_1, pendingcreates: 1] at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) Sounds like a connection is left open but cannot establish why! Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>
RE: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.
I was going to ask if you had 2 jobs running. If the checkpointing for both are setup to look at the same location I could see an error like this happening. Do both spark jobs have a reference to a checkpointing dir? David Newberger From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com] Sent: Friday, June 3, 2016 3:20 PM To: user @spark Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. OK I was running two spark streaming jobs, one using streaming data from Kafka and another from twitter in local mode on the same node. It is possible that the directory /user/hduser/checkpoint/temp is shared by both spark streaming jobs any experience on this please? Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/> On 3 June 2016 at 20:48, Mich Talebzadeh mailto:mich.talebza...@gmail.com>> wrote: Hi, Just started seeing these errors: 16/06/03 20:30:01 ERROR DFSClient: Failed to close inode 806125 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-907736468_1, pendingcreates: 1] at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) Sounds like a connection is left open but cannot establish why! Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>
RE: Spark Streaming - long garbage collection time
Have you tried UseG1GC in place of UseConcMarkSweepGC? This article really helped me with GC a few short weeks ago https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html David Newberger -Original Message- From: Marco1982 [mailto:marco.plata...@yahoo.it] Sent: Friday, June 3, 2016 2:19 PM To: user@spark.apache.org Subject: Spark Streaming - long garbage collection time Hi all, I'm running a Spark Streaming application with 1-hour batches to join two data feeds and write the output to disk. The total size of one data feed is about 40 GB per hour (split in multiple files), while the size of the second data feed is about 600-800 MB per hour (also split in multiple files). Due to application constraints, I may not be able to run smaller batches. Currently, it takes about 20 minutes to produce the output in a cluster with 140 cores and 700 GB of RAM. I'm running 7 workers and 28 executors, each with 5 cores and 22 GB of RAM. I execute mapToPair(), filter(), and reduceByKeyAndWindow(1 hour batch) on the 40 GB data feed. Most of the computation time is spent on these operations. What worries me is the Garbage Collection (GC) execution time per executor, which goes from 25 secs to 9.2 mins. I attach two screenshots below: one lists the GC time and one prints out GC comments for a single executor. I anticipate that the executor that spends 9.2 mins in doing garbage collection is eventually killed by the Spark driver. I think these numbers are too high. Do you have any suggestion about keeping GC time low? I'm already using Kryo Serializer, ++UseConcMarkSweepGC, and spark.rdd.compress=true. Is there anything else that would help? Thanks <http://apache-spark-user-list.1001560.n3.nabble.com/file/n27087/gc_time.png> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n27087/executor_16.png> -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-long-garbage-collection-time-tp27087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: About a problem running a spark job in a cdh-5.7.0 vmware image.
Alonso, I could totally be misunderstanding something or missing a piece of the puzzle however remove .setMaster. If you do that it will run with whatever the CDH VM is setup for which in the out of the box default case is YARN and Client. val sparkConf = new SparkConf().setAppName(“Some App thingy thing”) From the Spark 1.6.0 Scala API Documentation: https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.SparkConf “ Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. Most of the time, you would create a SparkConf object with new SparkConf(), which will load values from any spark.* Java system properties set in your application as well. In this case, parameters you set directly on the SparkConf object take priority over system properties. For unit tests, you can also call new SparkConf(false) to skip loading external settings and get the same configuration no matter what the system properties are. All setter methods in this class support chaining. For example, you can write new SparkConf().setMaster("local").setAppName("My app"). Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user. Spark does not support modifying the configuration at runtime. “ David Newberger From: Alonso Isidoro Roman [mailto:alons...@gmail.com] Sent: Friday, June 3, 2016 10:37 AM To: David Newberger Cc: user@spark.apache.org Subject: Re: About a problem running a spark job in a cdh-5.7.0 vmware image. Thank you David, so, i would have to change the way that i am creating SparkConf object, isn't? I can see in this link<http://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_running_spark_on_yarn.html#concept_ysw_lnp_h5> that the way to run a spark job using YARN is using this kind of command: spark-submit --class org.apache.spark.examples.SparkPi --master yarn \ --deploy-mode client SPARK_HOME/lib/spark-examples.jar 10 Can i use this way programmatically? maybe changing setMaster? to something like setMaster("yarn:quickstart.cloudera:8032")? I have seen the port in this guide: http://www.cloudera.com/documentation/enterprise/5-6-x/topics/cdh_ig_ports_cdh5.html
RE: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1
What does your processing time look like. Is it consistently within that 20sec micro batch window? David Newberger From: Adrian Tanase [mailto:atan...@adobe.com] Sent: Friday, June 3, 2016 8:14 AM To: user@spark.apache.org Cc: Cosmin Ciobanu Subject: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1 Hi all, Trying to repost this question from a colleague on my team, somehow his subscription is not active: http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html Appreciate any thoughts, -adrian
RE: About a problem running a spark job in a cdh-5.7.0 vmware image.
Alonso, The CDH VM uses YARN and the default deploy mode is client. I’ve been able to use the CDH VM for many learning scenarios. http://www.cloudera.com/documentation/enterprise/latest.html http://www.cloudera.com/documentation/enterprise/latest/topics/spark.html David Newberger From: Alonso [mailto:alons...@gmail.com] Sent: Friday, June 3, 2016 5:39 AM To: user@spark.apache.org Subject: About a problem running a spark job in a cdh-5.7.0 vmware image. Hi, i am developing a project that needs to use kafka, spark-streaming and spark-mllib, this is the github project<https://github.com/alonsoir/awesome-recommendation-engine/tree/develop>. I am using a vmware cdh-5.7-0 image, with 4 cores and 8 GB of ram, the file that i want to use is only 16 MB, if i finding problems related with resources because the process outputs this message: .set("spark.driver.allowMultipleContexts", "true") <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> 16/06/03 11:58:09 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources when i go to spark-master page, i can see this: Spark Master at spark://192.168.30.137:7077 URL: spark://192.168.30.137:7077 REST URL: spark://192.168.30.137:6066 (cluster mode) Alive Workers: 0 Cores in use: 0 Total, 0 Used Memory in use: 0.0 B Total, 0.0 B Used Applications: 2 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE Workers Worker Id Address State Cores Memory Running Applications Application ID Name Cores Memory per Node Submitted Time User State Duration app-20160603115752-0001 (kill) AmazonKafkaConnector 0 1024.0 MB 2016/06/03 11:57:52 cloudera WAITING 2.0 min app-20160603115751- (kill) AmazonKafkaConnector 0 1024.0 MB 2016/06/03 11:57:51 cloudera WAITING 2.0 min And this is the spark-worker output: Spark Worker at 192.168.30.137:7078 ID: worker-20160603115937-192.168.30.137-7078 Master URL: Cores: 4 (0 Used) Memory: 6.7 GB (0.0 B Used) Back to Master Running Executors (0) ExecutorID Cores State Memory Job Details Logs It is weird isn't ? master url is not set up and there is not any ExecutorID, Cores, so on so forth... If i do a ps xa | grep spark, this is the output: [cloudera@quickstart bin]$ ps xa | grep spark 6330 ?Sl 0:11 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Dspark.deploy.defaultCores=4 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master 6674 ?Sl 0:12 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /etc/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Dspark.history.fs.logDirectory=hdfs:///user/spark/applicationHistory -Dspark.history.ui.port=18088 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.history.HistoryServer 8153 pts/1Sl+0:14 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /home/cloudera/awesome-recommendation-engine/target/pack/lib/* -Dprog.home=/home/cloudera/awesome-recommendation-engine/target/pack -Dprog.version=1.0-SNAPSHOT example.spark.AmazonKafkaConnector 192.168.1.35:9092 amazonRatingsTopic 8413 ?Sl 0:04 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker spark://quickstart.cloudera:7077 8619 pts/3S+ 0:00 grep spark master is set up with four cores and 1 GB and worker has not any dedicated core and it is using 1GB, that is weird isn't ? I have configured the vmware image with 4 cores (from eight)
RE: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image
Have you tried it without either of the setMaster lines? Also, CDH 5.7 uses spark 1.6.0 with some patches. I would recommend using the cloudera repo for spark files in build sbt. I’d also check other files in the build sbt to see if there are cdh specific versions. David Newberger From: Alonso Isidoro Roman [mailto:alons...@gmail.com] Sent: Tuesday, May 31, 2016 1:23 PM To: David Newberger Cc: user@spark.apache.org Subject: Re: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image Hi David, the one of the develop branch. I think It should be the same, but actually not sure... Regards Alonso Isidoro Roman about.me/alonso.isidoro.roman 2016-05-31 19:40 GMT+02:00 David Newberger mailto:david.newber...@wandcorp.com>>: Is https://github.com/alonsoir/awesome-recommendation-engine/blob/master/build.sbt the build.sbt you are using? David Newberger QA Analyst WAND - The Future of Restaurant Technology (W) www.wandcorp.com<http://www.wandcorp.com/> (E) david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com> (P) 952.361.6200 From: Alonso [mailto:alons...@gmail.com<mailto:alons...@gmail.com>] Sent: Tuesday, May 31, 2016 11:11 AM To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image I have a vmware cloudera image, cdh-5.7 running with centos6.8, i am using OS X as my development machine, and the cdh image to run the code, i upload the code using git to the cdh image, i have modified my /etc/hosts file located in the cdh image with a line like this: 127.0.0.1 quickstart.cloudera quickstart localhost localhost.domain 192.168.30.138 quickstart.cloudera quickstart localhost localhost.domain The cloudera version that i am running is: [cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties # Autogenerated build properties version=2.6.0-cdh5.7.0 git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1 cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76 cloudera.base-branch=cdh5-base-2.6.0 cloudera.build-branch=cdh5-2.6.0_5.7.0 cloudera.pkg.version=2.6.0+cdh5.7.0+1280 cloudera.pkg.release=1.cdh5.7.0.p0.92 cloudera.cdh.release=cdh5.7.0 cloudera.build.time=2016.03.23-18:30:29GMT I can do a ls command in the vmware machine: [cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv -rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 /user/cloudera/ratings.csv I can read its content: [cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l 568454 The code is quite simple, just trying to map its content: val ratingFile="hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv" case class AmazonRating(userId: String, productId: String, rating: Double) val NumRecommendations = 10 val MinRecommendationsPerUser = 10 val MaxRecommendationsPerUser = 20 val MyUsername = "myself" val NumPartitions = 20 println("Using this ratingFile: " + ratingFile) // first create an RDD out of the rating file val rawTrainingRatings = sc.textFile(ratingFile).map { line => val Array(userId, productId, scoreStr) = line.split(",") AmazonRating(userId, productId, scoreStr.toDouble) } // only keep users that have rated between MinRecommendationsPerUser and MaxRecommendationsPerUser products val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => MinRecommendationsPerUser <= r._2.size && r._2.size < MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache() println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of ${rawTrainingRatings.count()}") I am getting this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 0 ratings out of 568454 because if i run the exact code within the spark-shell, i got this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 73279 ratings out of 568454 Why is it working fine within the spark-shell but it is not running fine programmatically in the vmware image? I am running the code using sbt-pack plugin to generate unix commands and run them within the vmware image which has the spark pseudocluster, This is the code i use to instantiate the sparkconf: val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector") .setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val ssc = new StreamingContext(sparkConf, Seconds(2)) //this checkpointdir should be in a conf file
RE: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image
Is https://github.com/alonsoir/awesome-recommendation-engine/blob/master/build.sbt the build.sbt you are using? David Newberger QA Analyst WAND - The Future of Restaurant Technology (W) www.wandcorp.com<http://www.wandcorp.com/> (E) david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com> (P) 952.361.6200 From: Alonso [mailto:alons...@gmail.com] Sent: Tuesday, May 31, 2016 11:11 AM To: user@spark.apache.org Subject: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image I have a vmware cloudera image, cdh-5.7 running with centos6.8, i am using OS X as my development machine, and the cdh image to run the code, i upload the code using git to the cdh image, i have modified my /etc/hosts file located in the cdh image with a line like this: 127.0.0.1 quickstart.cloudera quickstart localhost localhost.domain 192.168.30.138 quickstart.cloudera quickstart localhost localhost.domain The cloudera version that i am running is: [cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties # Autogenerated build properties version=2.6.0-cdh5.7.0 git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1 cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76 cloudera.base-branch=cdh5-base-2.6.0 cloudera.build-branch=cdh5-2.6.0_5.7.0 cloudera.pkg.version=2.6.0+cdh5.7.0+1280 cloudera.pkg.release=1.cdh5.7.0.p0.92 cloudera.cdh.release=cdh5.7.0 cloudera.build.time=2016.03.23-18:30:29GMT I can do a ls command in the vmware machine: [cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv -rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 /user/cloudera/ratings.csv I can read its content: [cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l 568454 The code is quite simple, just trying to map its content: val ratingFile="hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv" case class AmazonRating(userId: String, productId: String, rating: Double) val NumRecommendations = 10 val MinRecommendationsPerUser = 10 val MaxRecommendationsPerUser = 20 val MyUsername = "myself" val NumPartitions = 20 println("Using this ratingFile: " + ratingFile) // first create an RDD out of the rating file val rawTrainingRatings = sc.textFile(ratingFile).map { line => val Array(userId, productId, scoreStr) = line.split(",") AmazonRating(userId, productId, scoreStr.toDouble) } // only keep users that have rated between MinRecommendationsPerUser and MaxRecommendationsPerUser products val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => MinRecommendationsPerUser <= r._2.size && r._2.size < MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache() println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of ${rawTrainingRatings.count()}") I am getting this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 0 ratings out of 568454 because if i run the exact code within the spark-shell, i got this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 73279 ratings out of 568454 Why is it working fine within the spark-shell but it is not running fine programmatically in the vmware image? I am running the code using sbt-pack plugin to generate unix commands and run them within the vmware image which has the spark pseudocluster, This is the code i use to instantiate the sparkconf: val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector") .setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val ssc = new StreamingContext(sparkConf, Seconds(2)) //this checkpointdir should be in a conf file, for now it is hardcoded! val streamingCheckpointDir = "/home/cloudera/my-recommendation-spark-engine/checkpoint" ssc.checkpoint(streamingCheckpointDir) I have tried to use this way of setting spark master, but an exception raises, i suspect that this is symptomatic of my problem. //.setMaster("spark://quickstart.cloudera:7077") The exception when i try to use the fully qualified domain name: .setMaster("spark://quickstart.cloudera:7077") java.io.IOException: Failed to connect to quickstart.cloudera/127.0.0.1:7077<http://127.0.0.1:7077> at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) at org.a
RE: Can not set spark dynamic resource allocation
Hi All, The error you are seeing looks really similar to Spark-13514 to me. I could be wrong though https://issues.apache.org/jira/browse/SPARK-13514 Can you check yarn.nodemanager.local-dirs in your YARN configuration for "file://" Cheers! David Newberger -Original Message- From: Cui, Weifeng [mailto:weife...@a9.com] Sent: Friday, May 20, 2016 4:26 PM To: Marcelo Vanzin Cc: Ted Yu; Rodrick Brown; user; Zhao, Jun; Aulakh, Sahib; Song, Yiwei Subject: Re: Can not set spark dynamic resource allocation Sorry, here is the node-manager log. application_1463692924309_0002 is my test. Hope this will help. http://pastebin.com/0BPEcgcW On 5/20/16, 2:09 PM, "Marcelo Vanzin" wrote: >Hi Weifeng, > >That's the Spark event log, not the YARN application log. You get the >latter using the "yarn logs" command. > >On Fri, May 20, 2016 at 1:14 PM, Cui, Weifeng wrote: >> Here is the application log for this spark job. >> >> http://pastebin.com/2UJS9L4e >> >> >> >> Thanks, >> Weifeng >> >> >> >> >> >> From: "Aulakh, Sahib" >> Date: Friday, May 20, 2016 at 12:43 PM >> To: Ted Yu >> Cc: Rodrick Brown , Cui Weifeng >> , user , "Zhao, Jun" >> >> Subject: Re: Can not set spark dynamic resource allocation >> >> >> >> Yes it is yarn. We have configured spark shuffle service w yarn node >> manager but something must be off. >> >> >> >> We will send u app log on paste bin. >> >> Sent from my iPhone >> >> >> On May 20, 2016, at 12:35 PM, Ted Yu wrote: >> >> Since yarn-site.xml was cited, I assume the cluster runs YARN. >> >> >> >> On Fri, May 20, 2016 at 12:30 PM, Rodrick Brown >> wrote: >> >> Is this Yarn or Mesos? For the later you need to start an external >> shuffle service. >> >> Get Outlook for iOS >> >> >> >> >> >> On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng" >> >> wrote: >> >> Hi guys, >> >> >> >> Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set >> dynamic resource allocation for spark and we followed the following >> link. After the changes, all spark jobs failed. >> >> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-reso >> urce-allocation >> >> This test was on a test cluster which has 1 master machine (running >> namenode, resourcemanager and hive server), 1 worker machine (running >> datanode and nodemanager) and 1 machine as client( running spark shell). >> >> >> >> What I updated in config : >> >> >> >> 1. Update in spark-defaults.conf >> >> spark.dynamicAllocation.enabled true >> >> spark.shuffle.service.enabledtrue >> >> >> >> 2. Update yarn-site.xml >> >> >> >> yarn.nodemanager.aux-services >> mapreduce_shuffle,spark_shuffle >> >> >> >> yarn.nodemanager.aux-services.spark_shuffle.class >> org.apache.spark.network.yarn.YarnShuffleService >> >> >> >> spark.shuffle.service.enabled >> true >> >> >> 3. Copy spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath >> ($HADOOP_HOME/share/hadoop/yarn/*) in python code >> >> 4. Restart namenode, datanode, resourcemanager, nodemanger... retart >> everything >> >> 5. The config will update in all machines, resourcemanager and nodemanager. >> We update the config in one place and copy to all machines. >> >> >> >> What I tested: >> >> >> >> 1. I started a scala spark shell and check its environment variables, >> spark.dynamicAllocation.enabled is true. >> >> 2. I used the following code: >> >> scala > val line = >> sc.textFile("/spark-events/application_1463681113470_0006") >> >> line: org.apache.spark.rdd.RDD[String] = >> /spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at >> textFile at :27 >> >> scala > line.count # This command just stuck here >> >> >> >> 3. In the beginning, there is only 1 executor(this is for driver) and >> after line.count, I could see 3 executors, then dropped to 1. >> >> 4. Several jobs were launched and all of them failed. Tasks (for all >&
RE: Spark replacing Hadoop
Can we assume your question is “Will Spark replace Hadoop MapReduce?” or do you literally mean replacing the whole of Hadoop? David From: Ashok Kumar [mailto:ashok34...@yahoo.com.INVALID] Sent: Thursday, April 14, 2016 2:13 PM To: User Subject: Spark replacing Hadoop Hi, I hear that some saying that Hadoop is getting old and out of date and will be replaced by Spark! Does this make sense and if so how accurate is it? Best
RE: DStream how many RDD's are created by batch
Hi Natu, I believe you are correct one RDD would be created for each file. Cheers, David From: Natu Lauchande [mailto:nlaucha...@gmail.com] Sent: Tuesday, April 12, 2016 1:48 PM To: David Newberger Cc: user@spark.apache.org Subject: Re: DStream how many RDD's are created by batch Hi David, Thanks for you answer. I have a follow up question : I am using textFileStream , and listening in an S3 bucket for new files to process. Files are created every 5 minutes and my batch interval is 2 minutes . Does it mean that each file will be for one RDD ? Thanks, Natu On Tue, Apr 12, 2016 at 7:46 PM, David Newberger mailto:david.newber...@wandcorp.com>> wrote: Hi, Time is usually the criteria if I’m understanding your question. An RDD is created for each batch interval. If your interval is 500ms then an RDD would be created every 500ms. If it’s 2 seconds then an RDD is created every 2 seconds. Cheers, David From: Natu Lauchande [mailto:nlaucha...@gmail.com<mailto:nlaucha...@gmail.com>] Sent: Tuesday, April 12, 2016 7:09 AM To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: DStream how many RDD's are created by batch Hi, What's the criteria for the number of RDD's created for each micro bath iteration ? Thanks, Natu
RE: DStream how many RDD's are created by batch
Hi, Time is usually the criteria if I’m understanding your question. An RDD is created for each batch interval. If your interval is 500ms then an RDD would be created every 500ms. If it’s 2 seconds then an RDD is created every 2 seconds. Cheers, David From: Natu Lauchande [mailto:nlaucha...@gmail.com] Sent: Tuesday, April 12, 2016 7:09 AM To: user@spark.apache.org Subject: DStream how many RDD's are created by batch Hi, What's the criteria for the number of RDD's created for each micro bath iteration ? Thanks, Natu
Using Experminal Spark Features
Hi All, I've been looking at the Direct Approach for streaming Kafka integration (http://spark.apache.org/docs/latest/streaming-kafka-integration.html) because it looks like a good fit for our use cases. My concern is the feature is experimental according to the documentation. Has anyone used this approach yet and if so what has you experience been with using it? If it helps we'd be looking to implement it using Scala. Secondly, in general what has people experience been with using experimental features in Spark? Cheers, David Newberger
RE: fishing for help!
Hi Eran, Based on the limited information the first things that come to my mind are Processor, RAM, and Disk speed. David Newberger QA Analyst WAND - The Future of Restaurant Technology (W) www.wandcorp.com<http://www.wandcorp.com/> (E) david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com> (P) 952.361.6200 From: Eran Witkon [mailto:eranwit...@gmail.com] Sent: Monday, December 21, 2015 6:54 AM To: user Subject: fishing for help! Hi, I know it is a wide question but can you think of reasons why a pyspark job which runs on from server 1 using user 1 will run faster then the same job when running on server 2 with user 1 Eran