Re: Spark streaming driver java process RSS memory constantly increasing using cassandra driver

2015-12-14 Thread Conor Fennell
Just bumping the issue I am having, if anyone can provide direction? I
have been stuck on this for a while now.

Thanks,
Conor

On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell <conorapa...@gmail.com> wrote:
> Hi,
>
> I have a memory leak in the spark driver which is not in the heap or
> the non-heap.
> Even though neither of these are increasing, the java process RSS
> memory is and eventually takes up all the memory on the machine.
> I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2.
>
> I have reduced the leak to the code below.
> If I remove cassandra from the code below the memory leak does not happen.
> Can someone please explain why this is happening or what I can do to
> further investigate it.
> I have also include pics from jconsole for a couple of hours and
> datadog showing the same timeframe the rss memory increase.
>
> Thanks,
> Conor
>
> val ssc = new StreamingContext(sparkConf,
> Seconds(SparkStreamingBatchInterval))
>
> ssc.checkpoint(HdfsNameNodeUriPath)
>
> val kafkaParams = Map[String, String](METADATA_BROKER_LIST ->
> MetadataBrokerList)
>
> var kafkaMessages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams,
> KafkaTopics.split(DELIMITER).toSet)
>
> var eventBuckets = kafkaMessages.map(keyMessage => {
> implicit val formats = DefaultFormats.lossless
> val eventBucket = parse(keyMessage._2)
> val minute = new Date((eventBucket \ MINUTE).extract[Long])
> val business = (eventBucket \ BUSINESS).extract[String]
> val account = (eventBucket \ ACCOUNT).extract[String]
> (minute, business, account)})
>
>   var eventsToBeProcessed = eventBuckets.transform(rdd =>
> rdd.joinWithCassandraTable("analytics_events" + '_' +
> settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS,
> ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS,
> ACCOUNT)).filter(entry => {
> //remove any entries without a result
> entry._2.length > 0
>   }))
>
>   eventsToBeProcessed.foreachRDD(rdd => {
> println(rdd.take(1))
>   })
>
> sys.ShutdownHookThread {
>   System.err.println(s"Gracefully stopping $JobName Spark
> Streaming Application")
>   ssc.stop(stopSparkContext = true, stopGracefully = true)
>   System.err.println(s"$JobName streaming job stopped")
> }
>
> ssc.start()
> ssc.awaitTermination()

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming driver java process RSS memory constantly increasing using cassandra driver

2015-12-14 Thread Conor Fennell
Hi Abhijeet,

Thanks for pointing out the pics are not showing.
I have put all the images in this public google document:
https://docs.google.com/document/d/1xEJ0eTtXBlSso6SshLCWZHcRw4aEQMJflzKBsr2FHaw/edit?usp=sharing

All the code is in the first email; there is nothing else starting up
threads except the 80 or so threads Spark start up.
By the heap I mean: CMS Old Gen, Par Eden Space and Par Survivor Space.
By the non-heap I mean: Code Cache and CMS Perm Gen.
Which is the JVM memory space.

In the document you will see I am quite aggressive with the JVM options.

It does indicate native memory is leaking, but I am at a loss to
properly investigate it and it is looking like it is in the cassandra
driver itself or a combination of how spark is running it and the
driver.

It is also happening when running this within a foreachRDD:

  val cluster =
Cluster.builder().addContactPoints(CassandraHostname.split(DELIMITER):
_*).build();
  val session = cluster.connect()
  ranges.foreach(range => {

session.execute(s"INSERT INTO
analytics_metadata_$JobEnvironment.kafka_offsets (topic, job_name,
batch_time, partition, from_offset, until_offset) VALUES
('${range._1}','${range._2}',${range._3.getTime()},${range._4},${range._5},${range._6})")

  })

  session.close()
  cluster.close()


Thanks,
Conor



On Mon, Dec 14, 2015 at 2:12 PM, Singh, Abhijeet
<absi...@informatica.com> wrote:
> Hi Conor,
>
> What do you mean when you say leak is not in "Heap or non-Heap". If it is not 
> heap related than it has to be the native memory that is leaking. I can't say 
> for sure but you do have Threads working there and that could be using the 
> native memory. We didn't get any pics of JConsole.
>
> Thanks.
>
> -Original Message-
> From: Conor Fennell [mailto:conorapa...@gmail.com]
> Sent: Monday, December 14, 2015 4:15 PM
> To: user@spark.apache.org
> Subject: Re: Spark streaming driver java process RSS memory constantly 
> increasing using cassandra driver
>
> Just bumping the issue I am having, if anyone can provide direction? I have 
> been stuck on this for a while now.
>
> Thanks,
> Conor
>
> On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell <conorapa...@gmail.com> wrote:
>> Hi,
>>
>> I have a memory leak in the spark driver which is not in the heap or
>> the non-heap.
>> Even though neither of these are increasing, the java process RSS
>> memory is and eventually takes up all the memory on the machine.
>> I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2.
>>
>> I have reduced the leak to the code below.
>> If I remove cassandra from the code below the memory leak does not happen.
>> Can someone please explain why this is happening or what I can do to
>> further investigate it.
>> I have also include pics from jconsole for a couple of hours and
>> datadog showing the same timeframe the rss memory increase.
>>
>> Thanks,
>> Conor
>>
>> val ssc = new StreamingContext(sparkConf,
>> Seconds(SparkStreamingBatchInterval))
>>
>> ssc.checkpoint(HdfsNameNodeUriPath)
>>
>> val kafkaParams = Map[String, String](METADATA_BROKER_LIST ->
>> MetadataBrokerList)
>>
>> var kafkaMessages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams,
>> KafkaTopics.split(DELIMITER).toSet)
>>
>> var eventBuckets = kafkaMessages.map(keyMessage => {
>> implicit val formats = DefaultFormats.lossless
>> val eventBucket = parse(keyMessage._2)
>> val minute = new Date((eventBucket \ MINUTE).extract[Long])
>> val business = (eventBucket \ BUSINESS).extract[String]
>> val account = (eventBucket \ ACCOUNT).extract[String]
>> (minute, business, account)})
>>
>>   var eventsToBeProcessed = eventBuckets.transform(rdd =>
>> rdd.joinWithCassandraTable("analytics_events" + '_' +
>> settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS,
>> ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS,
>> ACCOUNT)).filter(entry => {
>> //remove any entries without a result
>> entry._2.length > 0
>>   }))
>>
>>   eventsToBeProcessed.foreachRDD(rdd => {
>> println(rdd.take(1))
>>   })
>>
>> sys.ShutdownHookThread {
>>   System.err.println(s"Gracefully stopping $JobName Spark
>> Streaming Application")
>>   ssc.stop(stopSparkContext = true, stopGracefully = true)
>>   System.err.println(s"$JobName streaming job stopped")
>> }
>>
>> ssc.start()
>> ssc.awaitTermination()
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
> commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Conor Fennell
Hi,

Firstly want to say a big thanks to Cody for contributing the kafka
direct stream.

I have been using the receiver based approach for months but the
direct stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing
delay. And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these exceptions.

I would be very appreciative for any direction and I can happily
provide more detail.

Thanks,
Conor

15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not
found, computing it
15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events,
partition 0 offsets 13630747 -> 13633001
15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id
is overridden to
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property
zookeeper.connect is overridden to
15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in
stage 654.0 (TID 5242)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 5243
15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage
654.0 (TID 5243)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Conor Fennell
Hi,

Firstly want to say a big thanks to Cody for contributing the kafka
direct stream.

I have been using the receiver based approach for months but the
direct stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing
delay. And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these exceptions.

I would be very appreciative for any direction and I can happily
provide more detail.

Thanks,
Conor

15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not
found, computing it
15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events,
partition 0 offsets 13630747 -> 13633001
15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id
is overridden to
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property
zookeeper.connect is overridden to
15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in
stage 654.0 (TID 5242)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 5243
15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage
654.0 (TID 5243)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-21 Thread Conor Fennell
Hi,

Firstly want to say a big thanks to Cody for contributing the kafka direct
stream.

I have been using the receiver based approach for months but the direct
stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing delay.
And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these
exceptions.

I would be very appreciative for any direction and I can happily provide
more detail.

Thanks,
Conor

15/10/21 23:30:31 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/21 23:31:01 ERROR executor.Executor: Exception in task 6.0 in
stage 66.0 (TID 406)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/21 23:31:01 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 407


Re: Streaming: updating broadcast variables

2015-07-06 Thread Conor Fennell
Hi James,

The code below shows one way how you can update the broadcast variable on
the executors:

// ... events stream setup

var startTime = new Date().getTime()

var hashMap = HashMap(1 - (1, 1), 2 - (2, 2))

var hashMapBroadcast = stream.context.sparkContext.broadcast(hashMap)

val TWO_MINUTES = 12

//eventStream is a DStream

eventStream.foreachRDD(rdd = {

  // Executed on the driver not the executors

  if (new Date().getTime() - startTime   TWO_MINUTES) {

// remove old broadcast variable

hashMapBroadcast.unpersist()

// create new one

hashMapBroadcast = stream.context.sparkContext.broadcast(1 - (1,
1000), 2 - (2, 2000))

  }

})

val broadcastValuesFromStream = activitiesByVisitKey.map(activity =
hashMapBroadcast.value(1))

// should print (1, 1000) after 2 minutes when updated

broadcastValuesFromStream.print()


Regards,

Conor




On Fri, Jul 3, 2015 at 4:24 PM, Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 You cannot update the broadcasted variable.. It wont get reflected on
 workers.
 On Jul 3, 2015 12:18 PM, James Cole ja...@binarism.net wrote:

 Hi all,

 I'm filtering a DStream using a function. I need to be able to change
 this function while the application is running (I'm polling a service to
 see if a user has changed their filtering). The filter function is a
 transformation and runs on the workers, so that's where the updates need to
 go. I'm not sure of the best way to do this.

 Initially broadcasting seemed like the way to go: the filter is actually
 quite large. But I don't think I can update something I've broadcasted.
 I've tried unpersisting and re-creating the broadcast variable but it
 became obvious this wasn't updating the reference on the worker. So am I
 correct in thinking I can't use broadcasted variables for this purpose?

 The next option seems to be: stopping the JavaStreamingContext, creating
 a new one from the SparkContext, updating the filter function, and
 re-creating the DStreams (I'm using direct streams from Kafka).

 If I re-created the JavaStreamingContext would the accumulators (which
 are created from the SparkContext) keep working? (Obviously I'm going to
 try this soon)

 In summary:

 1) Can broadcasted variables be updated?

 2) Is there a better way than re-creating the JavaStreamingContext and
 DStreams?

 Thanks,

 James




Re: Driver memory leak?

2015-04-29 Thread Conor Fennell
The memory leak could be related to this
https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved
in Spark 1.2.2 and 1.3.0.

It also was a HashMap causing the issue.

-Conor



On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote:

 Please use user@, not dev@

 This message does not appear to be from your driver. It also doesn't say
 you ran out of memory. It says you didn't tell YARN to let it use the
 memory you want. Look at the memory overhead param and please search first
 for related discussions.
 On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote:

 Hi, Dear developer, I am using Spark Streaming to read data from kafka,
 the program already run about 120 hours, but today the program failed
 because of driver's OOM as follow:

 Container [pid=49133,containerID=container_1429773909253_0050_02_01]
 is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB
 physical memory used; 3.2 GB of 50 GB virtual memory used. Killing
 container.

 I set --driver-memory to 2g, In my mind, driver is responsibility for
 job scheduler and job monitor(Please correct me If I'm wrong), Why it using
 so much memory?

 So I using jmap to monitor other program(already run about 48 hours):
 sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as
 follow:
 the java.util.HashMap$Entry and java.lang.Long  object using about 600Mb
 memory!

 and I also using jmap to monitor other program(already run about 1 hours
 ),  the result as follow:
 the java.util.HashMap$Entry and java.lang.Long object doesn't using so
 many memory, But I found, as time goes by, the java.util.HashMap$Entry
 and java.lang.Long object will occupied more and more memory,
 It is driver's memory leak question? or other reason?

 Thanks
 Best Regards












Re: Spark-1.2.2-bin-hadoop2.4.tgz missing

2015-04-20 Thread Conor Fennell
I looking for that build too.

-Conor

On Mon, Apr 20, 2015 at 9:18 AM, Marius Soutier mps@gmail.com wrote:

 Same problem here...

  On 20.04.2015, at 09:59, Zsolt Tóth toth.zsolt@gmail.com wrote:
 
  Hi all,
 
  it looks like the 1.2.2 pre-built version for hadoop2.4 is not available
 on the mirror sites. Am I missing something?
 
  Regards,
  Zsolt


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark streaming job throwing ClassNotFound exception when recovering from checkpointing

2015-02-10 Thread Conor Fennell
I am getting the following error when I kill the spark driver and restart
the job:

15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from
 file
 hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk
 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from
 file
 hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk
 java.io.IOException: java.lang.ClassNotFoundException:
 com.example.spark.streaming.reporting.live.jobs.Bucket
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
 at
 org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)



Spark version is 1.2.0

The streaming job is executing every 10 seconds with the following steps:

   1. Consuming JSON from a kafka topic called journeys and converting to
   case classes
   2. Filters resulting journeys stream based on a time attribute being set
   3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes
   e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000, hyperLogLog(journey
   id), 360) )
   4. ReduceByKey adding hyperloglogs
   5. UpdateStateByKey to add to previous states hyperloglog
   6. Then output results to Cassandra


I have made a sample app below to mimic the problem and put all classes
into one file, it is also attached to this email.

To get around the issue for the moment, I have removed the Bucket class and
stopped passing in a bucket array to the ActiveJourney class.
And instead I hard code all the time buckets I need in the ActiveJourney
class; this approach works and recovers from checkpointing but is not
extensible.

Can the Spark gurus explain why I get that ClassNotFound exception?

Need any more information, please let me know.

Much thanks,
Conor


package com.example.spark.streaming.reporting.live.jobs
 import java.util.Date
 import scala.Array.canBuildFrom
 import scala.collection.mutable.MutableList
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.Seconds
 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.kafka.KafkaUtils
 import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods.parse
 import org.json4s.jvalue2extractable
 import org.json4s.string2JsonInput
 import com.example.spark.streaming.utils.MilliSecondUtils
 import com.example.spark.streaming.utils.constants.ColumnFamilies
 import com.example.spark.streaming.utils.constants.Constants
 import com.example.spark.streaming.utils.constants.Milliseconds
 import com.example.spark.streaming.utils.constants.SparkConfig
 import com.datastax.spark.connector.SomeColumns
 import com.datastax.spark.connector.streaming.toDStreamFunctions
 import com.datastax.spark.connector.toNamedColumnRef
 import com.twitter.algebird.HLL
 import com.twitter.algebird.HyperLogLogMonoid

// Json parsing classes
 case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails])
 case class JourneyDetails(_id: String)
 case class JourneyCommand($set: Option[JourneySet])
 case class JourneySet(awayAt: Date)
 case class Bucket(val bucketType: String, val roundDown: (Long) = Long,
 val columnFamily: String, val size: Long, val maxIntervals: Int)

   case class ActiveState(var bucketType: String, var time: Long, var
hyperLogLog: HLL, var ttl: Int)

 object SampleJob {
   private final val Name = this.getClass().getSimpleName()
   def main(args: Array[String]) {
 if (args.length  8) {
   System.err.println(sUsage: $Name enviroment zkQuorum group
 topics numThreads hdfsUri cassandra intervalSeconds)
   System.exit(1)
 }
 System.out.print(args)
 val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri,
 cassandra, intervalSeconds) = args
 val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass().
 getPackage().getImplementationVersion()
 def functionToCreateContext(): StreamingContext = {

   // how many buckets
   val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils.
 roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds.
 FifteenMinutes, 90)
   val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour,
 ColumnFamilies.Visits_60, Milliseconds.Hour, 360)
   val day = Bucket(DAY, MilliSecondUtils.roundDownToNearestDay,
 ColumnFamilies.Visits_1440, Milliseconds.Day, 8640)
   val activeJourneys = new ActiveJourney(Array(fifteen,hour,day))
   val sparkConf = new SparkConf()
 .setAppName(Name)
 .set(SparkConfig.SparkMesosCoarse, Constants.True)
 .set(SparkConfig.SparkCleanerTtl, 300)
 

Spark streaming job throwing ClassNotFound exception when recovering from checkpointing

2015-02-10 Thread Conor Fennell
I am getting the following error when I kill the spark driver and restart
the job:

15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from
 file
 hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk
 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from
 file
 hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk
 java.io.IOException: java.lang.ClassNotFoundException:
 com.example.spark.streaming.reporting.live.jobs.Bucket
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
 at
 org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)



Spark version is 1.2.0

The streaming job is executing every 10 seconds with the following steps:

   1. Consuming JSON from a kafka topic called journeys and converting to
   case classes
   2. Filters resulting journeys stream based on a time attribute being set
   3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes
   e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000, hyperLogLog(journey
   id), 360) )
   4. ReduceByKey adding hyperloglogs
   5. UpdateStateByKey to add to previous states hyperloglog
   6. Then output results to Cassandra


I have made a sample app below to mimic the problem and put all classes
into one file.

To get around the issue for the moment, I have removed the Bucket class and
stopped passing in a bucket array to the ActiveJourney class.
And instead I hard code all the time buckets I need in the ActiveJourney
class; this approach works and recovers from checkpointing but is not
extensible.

Can the Spark gurus explain why I get that ClassNotFound exception?

Need any more information, please let me know.

Much thanks,
Conor


package com.example.spark.streaming.reporting.live.jobs
 import java.util.Date
 import scala.Array.canBuildFrom
 import scala.collection.mutable.MutableList
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.Seconds
 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.kafka.KafkaUtils
 import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods.parse
 import org.json4s.jvalue2extractable
 import org.json4s.string2JsonInput
 import com.example.spark.streaming.utils.MilliSecondUtils
 import com.example.spark.streaming.utils.constants.ColumnFamilies
 import com.example.spark.streaming.utils.constants.Constants
 import com.example.spark.streaming.utils.constants.Milliseconds
 import com.example.spark.streaming.utils.constants.SparkConfig
 import com.datastax.spark.connector.SomeColumns
 import com.datastax.spark.connector.streaming.toDStreamFunctions
 import com.datastax.spark.connector.toNamedColumnRef
 import com.twitter.algebird.HLL
 import com.twitter.algebird.HyperLogLogMonoid

// Json parsing classes
 case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails])
 case class JourneyDetails(_id: String)
 case class JourneyCommand($set: Option[JourneySet])
 case class JourneySet(awayAt: Date)
 case class Bucket(val bucketType: String, val roundDown: (Long) = Long,
 val columnFamily: String, val size: Long, val maxIntervals: Int)

   case class ActiveState(var bucketType: String, var time: Long, var
hyperLogLog: HLL, var ttl: Int)

 object SampleJob {
   private final val Name = this.getClass().getSimpleName()
   def main(args: Array[String]) {
 if (args.length  8) {
   System.err.println(sUsage: $Name enviroment zkQuorum group
 topics numThreads hdfsUri cassandra intervalSeconds)
   System.exit(1)
 }
 System.out.print(args)
 val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri,
 cassandra, intervalSeconds) = args
 val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass().
 getPackage().getImplementationVersion()
 def functionToCreateContext(): StreamingContext = {

   // how many buckets
   val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils.
 roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds.
 FifteenMinutes, 90)
   val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour,
 ColumnFamilies.Visits_60, Milliseconds.Hour, 360)
   val day = Bucket(DAY, MilliSecondUtils.roundDownToNearestDay,
 ColumnFamilies.Visits_1440, Milliseconds.Day, 8640)
   val activeJourneys = new ActiveJourney(Array(fifteen,hour,day))
   val sparkConf = new SparkConf()
 .setAppName(Name)
 .set(SparkConfig.SparkMesosCoarse, Constants.True)
 .set(SparkConfig.SparkCleanerTtl, 300)
 .set(SparkConfig.SparkDriverMemory, 128m)