Re: Using data in RDD to specify HDFS directory to write to

2014-11-17 Thread jschindler
Yes, thank you for suggestion.  The error I found below was in the worker
logs.

AssociationError [akka.tcp://sparkwor...@cloudera01.local.company.com:7078]
-> [akka.tcp://sparkexecu...@cloudera01.local.company.com:33329]: Error
[Association failed with
[akka.tcp://sparkexecu...@cloudera01.local.company.com:33329]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkexecu...@cloudera01.local.company.com:33329]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: cloudera01.local.company.com/10.40.19.67:33329
]

I looked into suggestions for this type of error and before I found out the
real reason for the error I upgraded my CDH to 5.2 so I could try setting
the driver and executor ports rather than have Spark choose them at random. 
My boss later turned off iptables and I no longer get that error. I do get a
different one however.  I have gone back into my project and changed my
hadoop version to 2.5.0-cdh5.2.0 so that should not be a problem.

from the master logs

2014-11-17 18:09:49,707 ERROR akka.remote.EndpointWriter: AssociationError
[akka.tcp://sparkmas...@cloudera01.local.local.com:7077] ->
[akka.tcp://spark@localhost:38181]: Error [Association failed with
[akka.tcp://spark@localhost:38181]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://spark@localhost:38181]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: localhost/127.0.0.1:38181
]

2014-11-17 18:19:08,271 INFO akka.actor.LocalActorRef: Message
[akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://sparkMaster/deadLetters] to
Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.40.19.67%3A37795-29#-1248895472]
was not delivered. [30] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
2014-11-17 18:19:28,251 ERROR Remoting:
org.apache.spark.deploy.ApplicationDescription; local class incompatible:
stream classdesc serialVersionUID = 583745679236071411, local class
serialVersionUID = 7674242335164700840
java.io.InvalidClassException:
org.apache.spark.deploy.ApplicationDescription; local class incompatible:
stream classdesc serialVersionUID = 583745679236071411, local class
serialVersionUID = 7674242335164700840
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at
akka.serialization.Serialization.deserialize(Serialization.scala:98)
at
akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:58)
at
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at
akka.serialization.Serialization.deserialize(Serialization.scala:98)
at
akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at
akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
at
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For

Re: Using data in RDD to specify HDFS directory to write to

2014-11-15 Thread jschindler
UPDATE

I have removed  and added things systematically to the job and have figured
that the inclusion of the construction of the SparkContext object is what is
causing it to fail.

The last run contained the code below.

I keep losing executors apparently and I'm not sure why.  Some of the
relevant spark output is below, will add more on Monday as I must go
participate in wknd activities.

 14/11/15 14:53:43 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20141115145328-0025/3 on hostPort cloudera01.local.company.com:7078 with
8 cores, 512.0 MB RAM
14/11/15 14:53:43 INFO AppClient$ClientActor: Executor updated:
app-20141115145328-0025/3 is now RUNNING
14/11/15 14:53:46 INFO MemoryStore: ensureFreeSpace(1063) called with
curMem=1063, maxMem=309225062
14/11/15 14:53:46 INFO MemoryStore: Block input-0-1416084826000 stored as
bytes to memory (size 1063.0 B, free 294.9 MB)
14/11/15 14:53:46 INFO BlockManagerInfo: Added input-0-1416084826000 in
memory on cloudera01.local.company.com:49902 (size: 1063.0 B, free: 294.9
MB)
14/11/15 14:53:46 INFO BlockManagerMaster: Updated info of block
input-0-1416084826000
14/11/15 14:53:46 WARN BlockManager: Block input-0-1416084826000 already
exists on this machine; not re-adding it
14/11/15 14:53:46 INFO BlockGenerator: Pushed block input-0-1416084826000
14/11/15 14:53:46 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://sparkexecu...@cloudera01.local.company.com:52715/user/Executor#-1518587721]
with ID 3
14/11/15 14:53:47 INFO BlockManagerInfo: Registering block manager
cloudera01.local.company.com:46926 with 294.9 MB RAM
14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor 3 disconnected,
so removing it
14/11/15 14:53:47 ERROR TaskSchedulerImpl: Lost an executor 3 (already
removed): remote Akka client disassociated
14/11/15 14:53:47 INFO AppClient$ClientActor: Executor updated:
app-20141115145328-0025/3 is now EXITED (Command exited with code 1)
14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor
app-20141115145328-0025/3 removed: Command exited with code 1
14/11/15 14:53:47 INFO AppClient$ClientActor: Executor added:
app-20141115145328-0025/4 on
worker-20141114114152-cloudera01.local.company.com-7078
(cloudera01.local.company.com:7078) with 8 cores

BLOCK 2 - last block before app fails:

14/11/15 14:54:15 INFO BlockManagerInfo: Registering block manager
cloudera01.local.uship.com:34335 with 294.9 MB RAM
14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor 9 disconnected,
so removing it
14/11/15 14:54:16 ERROR TaskSchedulerImpl: Lost an executor 9 (already
removed): remote Akka client disassociated
14/11/15 14:54:16 INFO AppClient$ClientActor: Executor updated:
app-20141115145328-0025/9 is now EXITED (Command exited with code 1)
14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor
app-20141115145328-0025/9 removed: Command exited with code 1
14/11/15 14:54:16 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: Master removed our application: FAILED
14/11/15 14:54:16 ERROR TaskSchedulerImpl: Exiting due to error from cluster
scheduler: Master removed our application: FAILED
[hdfs@cloudera01 root]$



import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark._

import org.json4s._
import org.json4s.native.JsonMethods._

import scala.collection.mutable.Map
import scala.collection.mutable.MutableList

case class Event(EventName: String, Payload: org.json4s.JValue)

object App {

  def main(args: Array[String]) {

val ssc = new StreamingContext("local[2]", "Data", Seconds(20))
ssc.checkpoint("checkpoint")


  val conf = new
SparkConf().setMaster("spark://cloudera01.local.company.com:7077")
  val sc = new SparkContext(conf)



val eventMap = scala.collection.immutable.Map("Events" -> 1)
val pipe = KafkaUtils.createStream(ssc,
"dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2)


val eventStream = pipe.map(data => {
  parse(data)
}).map(json => {


  implicit val formats = DefaultFormats
  val eventName = (json \ "event").extractOpt[String]
  Event(eventName.getOrElse("*** NO EVENT NAME ***"), json)

})


eventStream.foreach(x => {
  var arr = x.toArray
  x.foreachPartition(y => {
y.foreach(z => {print(z)})

  })
})


ssc.start()
ssc.awaitTermination()

  }

} 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789p19012.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using data in RDD to specify HDFS directory to write to

2014-11-14 Thread jschindler
I reworked my app using your idea of throwing the data in a map.  It looks
like it should work but I'm getting some strange errors and my job gets
terminated.  I get a  

"WARN TaskSchedulerImpl: Initial job has not accepted any resources; check
your cluster UI to ensure that workers are registered and have sufficient
memory"

and before that in the Spark output I receive a 


"Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 2.0:0 failed 1 times, most recent failure: Exception
failure in TID 3 on host localhost: org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0.0:0 failed 4 times, most recent
failure: TID 7 on host cloudera01.local.company.com failed for unknown
reason
Driver stacktrace:
   
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
   
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
   
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
   
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
   
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
   
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
scala.Option.foreach(Option.scala:236)
   
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
   
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
akka.actor.ActorCell.invoke(ActorCell.scala:456)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
akka.dispatch.Mailbox.run(Mailbox.scala:219)
   
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)"

I am using CDH 5.1.2 and went and set my worker_max_heapsize to its default
val of 512MB and also the executor_total_max_heapsize to default value of 8
GB after first experiencing this failure after reading it helped cure this
problem for someone.  

The code is below

object App {

  def main(args: Array[String]) {

val ssc = new StreamingContext("local[2]", "Data", Seconds(20))
ssc.checkpoint("checkpoint")

val eventMap = scala.collection.immutable.Map("uShip.Events" -> 1)
val pipe = KafkaUtils.createStream(ssc,
"dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2)

val eventStream = pipe.map(data => {
  par

Using data in RDD to specify HDFS directory to write to

2014-11-12 Thread jschindler
I am having a problem trying to figure out how to solve a problem.  I would
like to stream events from Kafka to my Spark Streaming app and write the
contents of each RDD out to a HDFS directory.  Each event that comes into
the app via kafka will be JSON and have an event field with the name of the
event.  I would like to grab the event name and then write out the event to
hdfs:///user/hdfs/.  

My first intuition was to grab the event name and put it into the rdd, then
run a forEachRDD loop and call save as text file where I concatenate the
event name into the directory path.  I have pasted the code below but it
will not work since I cannot access the data inside and RDD inside a
forEachRDD loop.  If I dump all the RDD data into an array using .collect I
wont be able to use the .saveAstextFile() method.  I'm at a loss for coming
up with a way to do this.  Any ideas/help would be greatly appreciated,
thanks!


case class Event(EventName: String, Payload: org.json4s.JValue)

object App {

  def main(args: Array[String]) {

val ssc = new StreamingContext("local[6]", "Data", Seconds(20))
ssc.checkpoint("checkpoint")

val eventMap = Map("Events" -> 1)
val pipe = KafkaUtils.createStream(ssc,
"dockerrepo,dockerrepo,dockerrepo", "Cons1",  eventMap).map(_._2)

val eventStream = pipe.map(data => {
  parse(data)
}).map(json => {
  implicit val formats = DefaultFormats
  val eventName = (json \ "event").extractOpt[String]
  (eventName, json)
  Event(eventName.getOrElse("*** NO EVENT NAME ***"), json)

})

eventStream.foreachRDD(event => {
//val eventName = event.EventName  //CAN'T ACCESS eventName!
event.saveAsTextFile("hdfs://ip-here/user/hdfs/" + eventName + "/rdd="
+ pageHit.id)  //what I would like to do if I could access eventName
})
  

ssc.start()
ssc.awaitTermination()

  }
}





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Writing to RabbitMQ

2014-08-19 Thread jschindler
Thanks for the quick and clear response!  I now have a better understanding
of what is going on regarding the driver and worker nodes which will help me
greatly.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12386.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Writing to RabbitMQ

2014-08-18 Thread jschindler
Well, it looks like I can use the .repartition(1) method to stuff everything
in one partition so that gets rid of the duplicate messages I send to
RabbitMQ but that seems like a bad idea perhaps.  Wouldn't that hurt
scalability?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12324.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Writing to RabbitMQ

2014-08-18 Thread jschindler
I am running into a different problem relating to this spark app right now
and I'm thinking it may be due to the fact that I am publishing to RabbitMQ
inside of a foreachPartition loop.  I would like to publish once for each
window and the app is publishing a lot more than that (it varies sometimes
it publishes 5 messages sometimes 15).  I am running this locally with 2
cores specified I believe (local[2]) so I wouldn't think there would be lots
of partitions.  Does anyone have any ideas on how to resolve this?  Below is
the code, thanks for any help!


import java.util.Calendar
import java.text.SimpleDateFormat


import kafka.producer._

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

import com.mongodb.casbah.Imports._

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer

import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.json4s.native.Serialization.{ read, write, writePretty }

case class PageHit(SQLCalls: Double, URL: String, PageLoadTime: Double)

case class RabbitPayload2(URL: String,SQLCalls: Double, PageLoadTime:
Double, NumberPageHits: Int, CurrentSecond: String)

object myData {
  def main(args: Array[String]) {

val ssc = new StreamingContext("local[2]", "Data", Seconds(30))
ssc.checkpoint("checkpoint")
val topicMap = Map("data" -> 1)

val pipe = KafkaUtils.createStream(ssc,
"Zookeeper_1,Zookeeper_2,Zookeeper_3", "Cons1", topicMap).map(_._2)

val pageHitStream = pipe.map(data => {
  parse(data)
}).map(json => {
  implicit val formats = DefaultFormats
  val SQLCalls = (json \ "SQLCalls").extractOpt[Double]
  val URL = (json \ "URL").extractOpt[String]
  val PageLoadTime = (json \ "PageLoadTime").extractOpt[Double]

  PageHit(SQLCalls.getOrElse(0.0), URL.getOrElse("Empty"),
PageLoadTime.getOrElse(0.0))
})

def truncate(value: Double): Double = { return (value * 1000).round /
1000.toDouble }

val windowStream = pageHitStream.filter(pageHit => pageHit.URL !=
"Empty").window(Minutes(1), Seconds(30))

windowStream.foreachRDD(pageHit => {

  if (pageHit.count > 0) {
val SQLCallArray = pageHit.collect.map(pageHit => pageHit.SQLCalls)
var avgNumberSQlCalls = 0.0
avgNumberSQlCalls = truncate((SQLCallArray.reduceLeft[Double](_ +
_)) / SQLCallArray.length)

var totalPageLoadTime = 0.0
val PageLoadTimeArray = pageHit.collect.map(pageHit =>
pageHit.PageLoadTime)

val avgPageLoadTime =
truncate(PageLoadTimeArray.reduceLeft[Double](_ + _) /
PageLoadTimeArray.length)

var URLArray = pageHit.collect.take(1).map(pageHit => pageHit.URL)
val URL = URLArray(0)

pageHit.foreachPartition(partition => {
  val factory = new ConnectionFactory()
  factory.setUsername("myUserName")
  factory.setPassword("myPassword")
  factory.setVirtualHost("/")
  factory.setHost("rabbits")
  factory.setPort(5672)
  val connection = factory.newConnection()

  val pageHitChannel = connection.createChannel()

  pageHitChannel.exchangeDeclare("PageHits", "fanout")
  
  val today = Calendar.getInstance().getTime()
  val currentSecondAsString = new SimpleDateFormat("ss")
  val secString = currentSecondAsString.format(today)


  implicit val formats = DefaultFormats

  val payload2 = RabbitPayload2(URL, avgNumberSQlCalls,
avgPageLoadTime, SQLCallArray.length / 2, secString)

 
  pageHitChannel.basicPublish("PageHits", "", null,
writePretty(payload2).getBytes)
 
  val mongoClient = MongoClient("mongodb")
  val db = mongoClient("Dev")
  val SQLCollection = db("SQLCalls")

  SQLCollection += MongoDBObject("Event" -> "Page Hit",
"URL" -> URL,
"Avg number of SQL Calls" -> avgNumberSQlCalls,
"Avg Page Load Time" -> avgPageLoadTime,
"Number of Page Hits in window" -> SQLCallArray.length / 2)

})
  }
})

ssc.start()
ssc.awaitTermination()

  }
}





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12322.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Writing to RabbitMQ

2014-08-05 Thread jschindler
You are correct in that I am trying to publish inside of a foreachRDD loop. 
I am currently refactoring and will try publishing inside the
foreachPartition loop.  Below is the code showing the way it is currently
written, thanks!


object myData {
  def main(args: Array[String]) {

val ssc = new StreamingContext("local[8]", "Data", Seconds(10))
ssc.checkpoint("checkpoint")
val topicMap = Map("pagehit.data" -> 1)

val factory = new ConnectionFactory()
factory.setUsername("officialUsername")
factory.setPassword("crypticPassword")
factory.setVirtualHost("/")
factory.setHost("rabbit-env")
factory.setPort()
val connection = factory.newConnection()

val SQLChannel = connection.createChannel()
SQLChannel.queueDeclare("SQLQueue", true, false, false, null) 

val Pipe = KafkaUtils.createStream(ssc,
"Zookeeper_1,Zookeeper_1,Zookeeper_3", "Cons1",  
topicMap).map(_._2)

//PARSE SOME JSON ETC

  windowStream.foreachRDD(pagehit => {
  val mongoClient = MongoClient("my-mongodb")
  val db = mongoClient("myClient")
  val SQLCollection = db("SQLCalls")
  
  val callArray = pagehit.map(_._1).collect
  val avg = (callArray.reduceLeft[Long](_+_))/callArray.length
  val URL = pagehit.take(1).map(_._2)

  SQLCollection += MongoDBObject("URL" -> URL(0).substring(7,
URL(0).length - 1),
 "Avg Page Load 
Time" -> avg)

  val toBuildJSON = Seq(baseMsg, avg.toString, closingBrace)
  val byteArray = toBuildJSON.mkString.getBytes()

  SQLChannel.basicPublish("", "SQLQueue", null, byteArray)

})



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p11445.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Writing to RabbitMQ

2014-08-03 Thread jschindler
I have been trying to write to RabbitMQ in my Spark Streaming app and I
receive the below exception:

java.io.NotSerializableException: com.rabbitmq.client.impl.ChannelN

Does anyone have experience sending their data to rabbit?
I am using the basicpublish call like so -> SQLChannel.basicPublish("",
"SQLQueue", null, byteArray)

I think I read earlier that Spark may be trying to serialize the data which
cannot be done to a byte array which is what rabbit demands from what I have
seen.

When I run the Spark locally my data is actually being sent to the queue but
I have a feeling it wouldn't work in production on a cluster.

Below if the full trace of the exception, please let me know if anyone has
an idea of what the problem is, thanks!

14/08/03 12:20:00 ERROR actor.OneForOneStrategy:
com.rabbitmq.client.impl.ChannelN
java.io.NotSerializableException: com.rabbitmq.client.impl.ChannelN
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
at
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at 
org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
at
org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
at
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorker

Re: Spark Streaming Error Help -> ERROR actor.OneForOneStrategy: key not found:

2014-07-03 Thread jschindler
I think I have found my answers but if anyone has thoughts please share.

After testing for a while I think the error doesn't have any effect on the
process.

I think it is the case that there must be elements left in the window from
last run otherwise my system is completely whack.

Please let me know if any of this looks incorrect, thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Error-Help-ERROR-actor-OneForOneStrategy-key-not-found-tp8746p8750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Streaming Error Help -> ERROR actor.OneForOneStrategy: key not found:

2014-07-03 Thread jschindler
I am getting this "ERROR actor.OneForOneStrategy: key not found:" exception
when I run my code and I'm not sure where it is looking for a key.  My set
up is I send packets to a third party service which then uses a webhook to
hit one of our servers, which then logs it using kafka.  I am just trying to
total up the number of elements/packets that come in using
".countByWindow(Seconds(30), Seconds(15)).print()".  I don't see anything in
the docs about countByWindow demanding a key value pair.  

I still see counts as an output so I'm thinking the exception is hurting
anything...

Also I am confused about some of the counts.  I start Spark up and it has
been producing counts greater than zero when I haven't sent any data packets
since the start of that run.  Are there packets saved that didn't get pushed
through from the last session?  This seems odd, I would think the count
would produce 0 if I haven't sent any data...

I'll post the code and exception error trace below, thanks for any help!

import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf


object RunningPageTotal {
  def main(args: Array[String]) {

val ssc = new StreamingContext("local[2]", "PageHitTotals", Seconds(5))
ssc.checkpoint("checkpoint") 
val topicMap = Map("User.Profile.Page.Hits" -> 1)

val pageHitStream = KafkaUtils.createStream(ssc,
"Zookeeper_1,Zookeeper_1,Zookeeper_3", "Cons1", topicMap).map(_._2)
val totalHits = pageHitStream.countByWindow(Seconds(30),
Seconds(15)).print()

ssc.start()
ssc.awaitTermination()
  }
}


14/07/03 12:42:25 INFO dstream.MappedDStream: Time 1404409345000 ms is
invalid as zeroTime is 1404409335000 ms and slideDuration is 15000 ms and
difference is 1 ms
14/07/03 12:42:25 ERROR actor.OneForOneStrategy: key not found:
1404409345000 ms
java.util.NoSuchElementException: key not found: 1404409345000 ms
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at
org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223)
at
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Error-Help-ERROR-actor-OneForOneStrategy-key-not-found-tp8746.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.