Re: Spark streaming driver java process RSS memory constantly increasing using cassandra driver
Just bumping the issue I am having, if anyone can provide direction? I have been stuck on this for a while now. Thanks, Conor On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell <conorapa...@gmail.com> wrote: > Hi, > > I have a memory leak in the spark driver which is not in the heap or > the non-heap. > Even though neither of these are increasing, the java process RSS > memory is and eventually takes up all the memory on the machine. > I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2. > > I have reduced the leak to the code below. > If I remove cassandra from the code below the memory leak does not happen. > Can someone please explain why this is happening or what I can do to > further investigate it. > I have also include pics from jconsole for a couple of hours and > datadog showing the same timeframe the rss memory increase. > > Thanks, > Conor > > val ssc = new StreamingContext(sparkConf, > Seconds(SparkStreamingBatchInterval)) > > ssc.checkpoint(HdfsNameNodeUriPath) > > val kafkaParams = Map[String, String](METADATA_BROKER_LIST -> > MetadataBrokerList) > > var kafkaMessages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, > KafkaTopics.split(DELIMITER).toSet) > > var eventBuckets = kafkaMessages.map(keyMessage => { > implicit val formats = DefaultFormats.lossless > val eventBucket = parse(keyMessage._2) > val minute = new Date((eventBucket \ MINUTE).extract[Long]) > val business = (eventBucket \ BUSINESS).extract[String] > val account = (eventBucket \ ACCOUNT).extract[String] > (minute, business, account)}) > > var eventsToBeProcessed = eventBuckets.transform(rdd => > rdd.joinWithCassandraTable("analytics_events" + '_' + > settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS, > ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS, > ACCOUNT)).filter(entry => { > //remove any entries without a result > entry._2.length > 0 > })) > > eventsToBeProcessed.foreachRDD(rdd => { > println(rdd.take(1)) > }) > > sys.ShutdownHookThread { > System.err.println(s"Gracefully stopping $JobName Spark > Streaming Application") > ssc.stop(stopSparkContext = true, stopGracefully = true) > System.err.println(s"$JobName streaming job stopped") > } > > ssc.start() > ssc.awaitTermination() - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming driver java process RSS memory constantly increasing using cassandra driver
Hi Abhijeet, Thanks for pointing out the pics are not showing. I have put all the images in this public google document: https://docs.google.com/document/d/1xEJ0eTtXBlSso6SshLCWZHcRw4aEQMJflzKBsr2FHaw/edit?usp=sharing All the code is in the first email; there is nothing else starting up threads except the 80 or so threads Spark start up. By the heap I mean: CMS Old Gen, Par Eden Space and Par Survivor Space. By the non-heap I mean: Code Cache and CMS Perm Gen. Which is the JVM memory space. In the document you will see I am quite aggressive with the JVM options. It does indicate native memory is leaking, but I am at a loss to properly investigate it and it is looking like it is in the cassandra driver itself or a combination of how spark is running it and the driver. It is also happening when running this within a foreachRDD: val cluster = Cluster.builder().addContactPoints(CassandraHostname.split(DELIMITER): _*).build(); val session = cluster.connect() ranges.foreach(range => { session.execute(s"INSERT INTO analytics_metadata_$JobEnvironment.kafka_offsets (topic, job_name, batch_time, partition, from_offset, until_offset) VALUES ('${range._1}','${range._2}',${range._3.getTime()},${range._4},${range._5},${range._6})") }) session.close() cluster.close() Thanks, Conor On Mon, Dec 14, 2015 at 2:12 PM, Singh, Abhijeet <absi...@informatica.com> wrote: > Hi Conor, > > What do you mean when you say leak is not in "Heap or non-Heap". If it is not > heap related than it has to be the native memory that is leaking. I can't say > for sure but you do have Threads working there and that could be using the > native memory. We didn't get any pics of JConsole. > > Thanks. > > -Original Message- > From: Conor Fennell [mailto:conorapa...@gmail.com] > Sent: Monday, December 14, 2015 4:15 PM > To: user@spark.apache.org > Subject: Re: Spark streaming driver java process RSS memory constantly > increasing using cassandra driver > > Just bumping the issue I am having, if anyone can provide direction? I have > been stuck on this for a while now. > > Thanks, > Conor > > On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell <conorapa...@gmail.com> wrote: >> Hi, >> >> I have a memory leak in the spark driver which is not in the heap or >> the non-heap. >> Even though neither of these are increasing, the java process RSS >> memory is and eventually takes up all the memory on the machine. >> I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2. >> >> I have reduced the leak to the code below. >> If I remove cassandra from the code below the memory leak does not happen. >> Can someone please explain why this is happening or what I can do to >> further investigate it. >> I have also include pics from jconsole for a couple of hours and >> datadog showing the same timeframe the rss memory increase. >> >> Thanks, >> Conor >> >> val ssc = new StreamingContext(sparkConf, >> Seconds(SparkStreamingBatchInterval)) >> >> ssc.checkpoint(HdfsNameNodeUriPath) >> >> val kafkaParams = Map[String, String](METADATA_BROKER_LIST -> >> MetadataBrokerList) >> >> var kafkaMessages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, >> KafkaTopics.split(DELIMITER).toSet) >> >> var eventBuckets = kafkaMessages.map(keyMessage => { >> implicit val formats = DefaultFormats.lossless >> val eventBucket = parse(keyMessage._2) >> val minute = new Date((eventBucket \ MINUTE).extract[Long]) >> val business = (eventBucket \ BUSINESS).extract[String] >> val account = (eventBucket \ ACCOUNT).extract[String] >> (minute, business, account)}) >> >> var eventsToBeProcessed = eventBuckets.transform(rdd => >> rdd.joinWithCassandraTable("analytics_events" + '_' + >> settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS, >> ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS, >> ACCOUNT)).filter(entry => { >> //remove any entries without a result >> entry._2.length > 0 >> })) >> >> eventsToBeProcessed.foreachRDD(rdd => { >> println(rdd.take(1)) >> }) >> >> sys.ShutdownHookThread { >> System.err.println(s"Gracefully stopping $JobName Spark >> Streaming Application") >> ssc.stop(stopSparkContext = true, stopGracefully = true) >> System.err.println(s"$JobName streaming job stopped") >> } >> >> ssc.start() >> ssc.awaitTermination() > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional > commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Sporadic error after moving from kafka receiver to kafka direct stream
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs to Cassandra and outputting to kafka. I am also saving the offsets to Cassandra. But unfortunately I am sporadically getting the error below. It recovers and continues but gives a large spike in the processing delay. And it can happen in every 3 or 4 batches. I still have other receiver jobs running and they never throw these exceptions. I would be very appreciative for any direction and I can happily provide more detail. Thanks, Conor 15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not found, computing it 15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events, partition 0 offsets 13630747 -> 13633001 15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id is overridden to 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property zookeeper.connect is overridden to 15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException 15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in stage 654.0 (TID 5242) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5243 15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage 654.0 (TID 5243) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Sporadic error after moving from kafka receiver to kafka direct stream
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs to Cassandra and outputting to kafka. I am also saving the offsets to Cassandra. But unfortunately I am sporadically getting the error below. It recovers and continues but gives a large spike in the processing delay. And it can happen in every 3 or 4 batches. I still have other receiver jobs running and they never throw these exceptions. I would be very appreciative for any direction and I can happily provide more detail. Thanks, Conor 15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not found, computing it 15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events, partition 0 offsets 13630747 -> 13633001 15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id is overridden to 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property zookeeper.connect is overridden to 15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException 15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in stage 654.0 (TID 5242) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5243 15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage 654.0 (TID 5243) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Sporadic error after moving from kafka receiver to kafka direct stream
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs to Cassandra and outputting to kafka. I am also saving the offsets to Cassandra. But unfortunately I am sporadically getting the error below. It recovers and continues but gives a large spike in the processing delay. And it can happen in every 3 or 4 batches. I still have other receiver jobs running and they never throw these exceptions. I would be very appreciative for any direction and I can happily provide more detail. Thanks, Conor 15/10/21 23:30:31 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException 15/10/21 23:31:01 ERROR executor.Executor: Exception in task 6.0 in stage 66.0 (TID 406) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/10/21 23:31:01 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 407
Re: Streaming: updating broadcast variables
Hi James, The code below shows one way how you can update the broadcast variable on the executors: // ... events stream setup var startTime = new Date().getTime() var hashMap = HashMap(1 - (1, 1), 2 - (2, 2)) var hashMapBroadcast = stream.context.sparkContext.broadcast(hashMap) val TWO_MINUTES = 12 //eventStream is a DStream eventStream.foreachRDD(rdd = { // Executed on the driver not the executors if (new Date().getTime() - startTime TWO_MINUTES) { // remove old broadcast variable hashMapBroadcast.unpersist() // create new one hashMapBroadcast = stream.context.sparkContext.broadcast(1 - (1, 1000), 2 - (2, 2000)) } }) val broadcastValuesFromStream = activitiesByVisitKey.map(activity = hashMapBroadcast.value(1)) // should print (1, 1000) after 2 minutes when updated broadcastValuesFromStream.print() Regards, Conor On Fri, Jul 3, 2015 at 4:24 PM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: You cannot update the broadcasted variable.. It wont get reflected on workers. On Jul 3, 2015 12:18 PM, James Cole ja...@binarism.net wrote: Hi all, I'm filtering a DStream using a function. I need to be able to change this function while the application is running (I'm polling a service to see if a user has changed their filtering). The filter function is a transformation and runs on the workers, so that's where the updates need to go. I'm not sure of the best way to do this. Initially broadcasting seemed like the way to go: the filter is actually quite large. But I don't think I can update something I've broadcasted. I've tried unpersisting and re-creating the broadcast variable but it became obvious this wasn't updating the reference on the worker. So am I correct in thinking I can't use broadcasted variables for this purpose? The next option seems to be: stopping the JavaStreamingContext, creating a new one from the SparkContext, updating the filter function, and re-creating the DStreams (I'm using direct streams from Kafka). If I re-created the JavaStreamingContext would the accumulators (which are created from the SparkContext) keep working? (Obviously I'm going to try this soon) In summary: 1) Can broadcasted variables be updated? 2) Is there a better way than re-creating the JavaStreamingContext and DStreams? Thanks, James
Re: Driver memory leak?
The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. It also was a HashMap causing the issue. -Conor On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote: Please use user@, not dev@ This message does not appear to be from your driver. It also doesn't say you ran out of memory. It says you didn't tell YARN to let it use the memory you want. Look at the memory overhead param and please search first for related discussions. On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote: Hi, Dear developer, I am using Spark Streaming to read data from kafka, the program already run about 120 hours, but today the program failed because of driver's OOM as follow: Container [pid=49133,containerID=container_1429773909253_0050_02_01] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 3.2 GB of 50 GB virtual memory used. Killing container. I set --driver-memory to 2g, In my mind, driver is responsibility for job scheduler and job monitor(Please correct me If I'm wrong), Why it using so much memory? So I using jmap to monitor other program(already run about 48 hours): sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow: the java.util.HashMap$Entry and java.lang.Long object using about 600Mb memory! and I also using jmap to monitor other program(already run about 1 hours ), the result as follow: the java.util.HashMap$Entry and java.lang.Long object doesn't using so many memory, But I found, as time goes by, the java.util.HashMap$Entry and java.lang.Long object will occupied more and more memory, It is driver's memory leak question? or other reason? Thanks Best Regards
Re: Spark-1.2.2-bin-hadoop2.4.tgz missing
I looking for that build too. -Conor On Mon, Apr 20, 2015 at 9:18 AM, Marius Soutier mps@gmail.com wrote: Same problem here... On 20.04.2015, at 09:59, Zsolt Tóth toth.zsolt@gmail.com wrote: Hi all, it looks like the 1.2.2 pre-built version for hadoop2.4 is not available on the mirror sites. Am I missing something? Regards, Zsolt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark streaming job throwing ClassNotFound exception when recovering from checkpointing
I am getting the following error when I kill the spark driver and restart the job: 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk java.io.IOException: java.lang.ClassNotFoundException: com.example.spark.streaming.reporting.live.jobs.Bucket at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988) at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Spark version is 1.2.0 The streaming job is executing every 10 seconds with the following steps: 1. Consuming JSON from a kafka topic called journeys and converting to case classes 2. Filters resulting journeys stream based on a time attribute being set 3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000, hyperLogLog(journey id), 360) ) 4. ReduceByKey adding hyperloglogs 5. UpdateStateByKey to add to previous states hyperloglog 6. Then output results to Cassandra I have made a sample app below to mimic the problem and put all classes into one file, it is also attached to this email. To get around the issue for the moment, I have removed the Bucket class and stopped passing in a bucket array to the ActiveJourney class. And instead I hard code all the time buckets I need in the ActiveJourney class; this approach works and recovers from checkpointing but is not extensible. Can the Spark gurus explain why I get that ClassNotFound exception? Need any more information, please let me know. Much thanks, Conor package com.example.spark.streaming.reporting.live.jobs import java.util.Date import scala.Array.canBuildFrom import scala.collection.mutable.MutableList import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.json4s.jvalue2extractable import org.json4s.string2JsonInput import com.example.spark.streaming.utils.MilliSecondUtils import com.example.spark.streaming.utils.constants.ColumnFamilies import com.example.spark.streaming.utils.constants.Constants import com.example.spark.streaming.utils.constants.Milliseconds import com.example.spark.streaming.utils.constants.SparkConfig import com.datastax.spark.connector.SomeColumns import com.datastax.spark.connector.streaming.toDStreamFunctions import com.datastax.spark.connector.toNamedColumnRef import com.twitter.algebird.HLL import com.twitter.algebird.HyperLogLogMonoid // Json parsing classes case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails]) case class JourneyDetails(_id: String) case class JourneyCommand($set: Option[JourneySet]) case class JourneySet(awayAt: Date) case class Bucket(val bucketType: String, val roundDown: (Long) = Long, val columnFamily: String, val size: Long, val maxIntervals: Int) case class ActiveState(var bucketType: String, var time: Long, var hyperLogLog: HLL, var ttl: Int) object SampleJob { private final val Name = this.getClass().getSimpleName() def main(args: Array[String]) { if (args.length 8) { System.err.println(sUsage: $Name enviroment zkQuorum group topics numThreads hdfsUri cassandra intervalSeconds) System.exit(1) } System.out.print(args) val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri, cassandra, intervalSeconds) = args val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass(). getPackage().getImplementationVersion() def functionToCreateContext(): StreamingContext = { // how many buckets val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils. roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds. FifteenMinutes, 90) val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour, ColumnFamilies.Visits_60, Milliseconds.Hour, 360) val day = Bucket(DAY, MilliSecondUtils.roundDownToNearestDay, ColumnFamilies.Visits_1440, Milliseconds.Day, 8640) val activeJourneys = new ActiveJourney(Array(fifteen,hour,day)) val sparkConf = new SparkConf() .setAppName(Name) .set(SparkConfig.SparkMesosCoarse, Constants.True) .set(SparkConfig.SparkCleanerTtl, 300)
Spark streaming job throwing ClassNotFound exception when recovering from checkpointing
I am getting the following error when I kill the spark driver and restart the job: 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk java.io.IOException: java.lang.ClassNotFoundException: com.example.spark.streaming.reporting.live.jobs.Bucket at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988) at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Spark version is 1.2.0 The streaming job is executing every 10 seconds with the following steps: 1. Consuming JSON from a kafka topic called journeys and converting to case classes 2. Filters resulting journeys stream based on a time attribute being set 3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000, hyperLogLog(journey id), 360) ) 4. ReduceByKey adding hyperloglogs 5. UpdateStateByKey to add to previous states hyperloglog 6. Then output results to Cassandra I have made a sample app below to mimic the problem and put all classes into one file. To get around the issue for the moment, I have removed the Bucket class and stopped passing in a bucket array to the ActiveJourney class. And instead I hard code all the time buckets I need in the ActiveJourney class; this approach works and recovers from checkpointing but is not extensible. Can the Spark gurus explain why I get that ClassNotFound exception? Need any more information, please let me know. Much thanks, Conor package com.example.spark.streaming.reporting.live.jobs import java.util.Date import scala.Array.canBuildFrom import scala.collection.mutable.MutableList import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.json4s.jvalue2extractable import org.json4s.string2JsonInput import com.example.spark.streaming.utils.MilliSecondUtils import com.example.spark.streaming.utils.constants.ColumnFamilies import com.example.spark.streaming.utils.constants.Constants import com.example.spark.streaming.utils.constants.Milliseconds import com.example.spark.streaming.utils.constants.SparkConfig import com.datastax.spark.connector.SomeColumns import com.datastax.spark.connector.streaming.toDStreamFunctions import com.datastax.spark.connector.toNamedColumnRef import com.twitter.algebird.HLL import com.twitter.algebird.HyperLogLogMonoid // Json parsing classes case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails]) case class JourneyDetails(_id: String) case class JourneyCommand($set: Option[JourneySet]) case class JourneySet(awayAt: Date) case class Bucket(val bucketType: String, val roundDown: (Long) = Long, val columnFamily: String, val size: Long, val maxIntervals: Int) case class ActiveState(var bucketType: String, var time: Long, var hyperLogLog: HLL, var ttl: Int) object SampleJob { private final val Name = this.getClass().getSimpleName() def main(args: Array[String]) { if (args.length 8) { System.err.println(sUsage: $Name enviroment zkQuorum group topics numThreads hdfsUri cassandra intervalSeconds) System.exit(1) } System.out.print(args) val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri, cassandra, intervalSeconds) = args val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass(). getPackage().getImplementationVersion() def functionToCreateContext(): StreamingContext = { // how many buckets val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils. roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds. FifteenMinutes, 90) val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour, ColumnFamilies.Visits_60, Milliseconds.Hour, 360) val day = Bucket(DAY, MilliSecondUtils.roundDownToNearestDay, ColumnFamilies.Visits_1440, Milliseconds.Day, 8640) val activeJourneys = new ActiveJourney(Array(fifteen,hour,day)) val sparkConf = new SparkConf() .setAppName(Name) .set(SparkConfig.SparkMesosCoarse, Constants.True) .set(SparkConfig.SparkCleanerTtl, 300) .set(SparkConfig.SparkDriverMemory, 128m)