Re: Using data in RDD to specify HDFS directory to write to
Yes, thank you for suggestion. The error I found below was in the worker logs. AssociationError [akka.tcp://sparkwor...@cloudera01.local.company.com:7078] -> [akka.tcp://sparkexecu...@cloudera01.local.company.com:33329]: Error [Association failed with [akka.tcp://sparkexecu...@cloudera01.local.company.com:33329]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@cloudera01.local.company.com:33329] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: cloudera01.local.company.com/10.40.19.67:33329 ] I looked into suggestions for this type of error and before I found out the real reason for the error I upgraded my CDH to 5.2 so I could try setting the driver and executor ports rather than have Spark choose them at random. My boss later turned off iptables and I no longer get that error. I do get a different one however. I have gone back into my project and changed my hadoop version to 2.5.0-cdh5.2.0 so that should not be a problem. from the master logs 2014-11-17 18:09:49,707 ERROR akka.remote.EndpointWriter: AssociationError [akka.tcp://sparkmas...@cloudera01.local.local.com:7077] -> [akka.tcp://spark@localhost:38181]: Error [Association failed with [akka.tcp://spark@localhost:38181]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@localhost:38181] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: localhost/127.0.0.1:38181 ] 2014-11-17 18:19:08,271 INFO akka.actor.LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.40.19.67%3A37795-29#-1248895472] was not delivered. [30] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 2014-11-17 18:19:28,251 ERROR Remoting: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 583745679236071411, local class serialVersionUID = 7674242335164700840 java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 583745679236071411, local class serialVersionUID = 7674242335164700840 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:58) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
Re: Using data in RDD to specify HDFS directory to write to
UPDATE I have removed and added things systematically to the job and have figured that the inclusion of the construction of the SparkContext object is what is causing it to fail. The last run contained the code below. I keep losing executors apparently and I'm not sure why. Some of the relevant spark output is below, will add more on Monday as I must go participate in wknd activities. 14/11/15 14:53:43 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141115145328-0025/3 on hostPort cloudera01.local.company.com:7078 with 8 cores, 512.0 MB RAM 14/11/15 14:53:43 INFO AppClient$ClientActor: Executor updated: app-20141115145328-0025/3 is now RUNNING 14/11/15 14:53:46 INFO MemoryStore: ensureFreeSpace(1063) called with curMem=1063, maxMem=309225062 14/11/15 14:53:46 INFO MemoryStore: Block input-0-1416084826000 stored as bytes to memory (size 1063.0 B, free 294.9 MB) 14/11/15 14:53:46 INFO BlockManagerInfo: Added input-0-1416084826000 in memory on cloudera01.local.company.com:49902 (size: 1063.0 B, free: 294.9 MB) 14/11/15 14:53:46 INFO BlockManagerMaster: Updated info of block input-0-1416084826000 14/11/15 14:53:46 WARN BlockManager: Block input-0-1416084826000 already exists on this machine; not re-adding it 14/11/15 14:53:46 INFO BlockGenerator: Pushed block input-0-1416084826000 14/11/15 14:53:46 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@cloudera01.local.company.com:52715/user/Executor#-1518587721] with ID 3 14/11/15 14:53:47 INFO BlockManagerInfo: Registering block manager cloudera01.local.company.com:46926 with 294.9 MB RAM 14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor 3 disconnected, so removing it 14/11/15 14:53:47 ERROR TaskSchedulerImpl: Lost an executor 3 (already removed): remote Akka client disassociated 14/11/15 14:53:47 INFO AppClient$ClientActor: Executor updated: app-20141115145328-0025/3 is now EXITED (Command exited with code 1) 14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor app-20141115145328-0025/3 removed: Command exited with code 1 14/11/15 14:53:47 INFO AppClient$ClientActor: Executor added: app-20141115145328-0025/4 on worker-20141114114152-cloudera01.local.company.com-7078 (cloudera01.local.company.com:7078) with 8 cores BLOCK 2 - last block before app fails: 14/11/15 14:54:15 INFO BlockManagerInfo: Registering block manager cloudera01.local.uship.com:34335 with 294.9 MB RAM 14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor 9 disconnected, so removing it 14/11/15 14:54:16 ERROR TaskSchedulerImpl: Lost an executor 9 (already removed): remote Akka client disassociated 14/11/15 14:54:16 INFO AppClient$ClientActor: Executor updated: app-20141115145328-0025/9 is now EXITED (Command exited with code 1) 14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor app-20141115145328-0025/9 removed: Command exited with code 1 14/11/15 14:54:16 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 14/11/15 14:54:16 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: Master removed our application: FAILED [hdfs@cloudera01 root]$ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf import org.apache.spark._ import org.json4s._ import org.json4s.native.JsonMethods._ import scala.collection.mutable.Map import scala.collection.mutable.MutableList case class Event(EventName: String, Payload: org.json4s.JValue) object App { def main(args: Array[String]) { val ssc = new StreamingContext("local[2]", "Data", Seconds(20)) ssc.checkpoint("checkpoint") val conf = new SparkConf().setMaster("spark://cloudera01.local.company.com:7077") val sc = new SparkContext(conf) val eventMap = scala.collection.immutable.Map("Events" -> 1) val pipe = KafkaUtils.createStream(ssc, "dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2) val eventStream = pipe.map(data => { parse(data) }).map(json => { implicit val formats = DefaultFormats val eventName = (json \ "event").extractOpt[String] Event(eventName.getOrElse("*** NO EVENT NAME ***"), json) }) eventStream.foreach(x => { var arr = x.toArray x.foreachPartition(y => { y.foreach(z => {print(z)}) }) }) ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789p19012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using data in RDD to specify HDFS directory to write to
I reworked my app using your idea of throwing the data in a map. It looks like it should work but I'm getting some strange errors and my job gets terminated. I get a "WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory" and before that in the Spark output I receive a "Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 1 times, most recent failure: Exception failure in TID 3 on host localhost: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent failure: TID 7 on host cloudera01.local.company.com failed for unknown reason Driver stacktrace: org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) scala.Option.foreach(Option.scala:236) org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) akka.actor.ActorCell.invoke(ActorCell.scala:456) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) akka.dispatch.Mailbox.run(Mailbox.scala:219) akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)" I am using CDH 5.1.2 and went and set my worker_max_heapsize to its default val of 512MB and also the executor_total_max_heapsize to default value of 8 GB after first experiencing this failure after reading it helped cure this problem for someone. The code is below object App { def main(args: Array[String]) { val ssc = new StreamingContext("local[2]", "Data", Seconds(20)) ssc.checkpoint("checkpoint") val eventMap = scala.collection.immutable.Map("uShip.Events" -> 1) val pipe = KafkaUtils.createStream(ssc, "dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2) val eventStream = pipe.map(data => { par
Using data in RDD to specify HDFS directory to write to
I am having a problem trying to figure out how to solve a problem. I would like to stream events from Kafka to my Spark Streaming app and write the contents of each RDD out to a HDFS directory. Each event that comes into the app via kafka will be JSON and have an event field with the name of the event. I would like to grab the event name and then write out the event to hdfs:///user/hdfs/. My first intuition was to grab the event name and put it into the rdd, then run a forEachRDD loop and call save as text file where I concatenate the event name into the directory path. I have pasted the code below but it will not work since I cannot access the data inside and RDD inside a forEachRDD loop. If I dump all the RDD data into an array using .collect I wont be able to use the .saveAstextFile() method. I'm at a loss for coming up with a way to do this. Any ideas/help would be greatly appreciated, thanks! case class Event(EventName: String, Payload: org.json4s.JValue) object App { def main(args: Array[String]) { val ssc = new StreamingContext("local[6]", "Data", Seconds(20)) ssc.checkpoint("checkpoint") val eventMap = Map("Events" -> 1) val pipe = KafkaUtils.createStream(ssc, "dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2) val eventStream = pipe.map(data => { parse(data) }).map(json => { implicit val formats = DefaultFormats val eventName = (json \ "event").extractOpt[String] (eventName, json) Event(eventName.getOrElse("*** NO EVENT NAME ***"), json) }) eventStream.foreachRDD(event => { //val eventName = event.EventName //CAN'T ACCESS eventName! event.saveAsTextFile("hdfs://ip-here/user/hdfs/" + eventName + "/rdd=" + pageHit.id) //what I would like to do if I could access eventName }) ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing to RabbitMQ
Thanks for the quick and clear response! I now have a better understanding of what is going on regarding the driver and worker nodes which will help me greatly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12386.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing to RabbitMQ
Well, it looks like I can use the .repartition(1) method to stuff everything in one partition so that gets rid of the duplicate messages I send to RabbitMQ but that seems like a bad idea perhaps. Wouldn't that hurt scalability? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing to RabbitMQ
I am running into a different problem relating to this spark app right now and I'm thinking it may be due to the fact that I am publishing to RabbitMQ inside of a foreachPartition loop. I would like to publish once for each window and the app is publishing a lot more than that (it varies sometimes it publishes 5 messages sometimes 15). I am running this locally with 2 cores specified I believe (local[2]) so I wouldn't think there would be lots of partitions. Does anyone have any ideas on how to resolve this? Below is the code, thanks for any help! import java.util.Calendar import java.text.SimpleDateFormat import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf import com.mongodb.casbah.Imports._ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer import org.json4s._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization import org.json4s.native.Serialization.{ read, write, writePretty } case class PageHit(SQLCalls: Double, URL: String, PageLoadTime: Double) case class RabbitPayload2(URL: String,SQLCalls: Double, PageLoadTime: Double, NumberPageHits: Int, CurrentSecond: String) object myData { def main(args: Array[String]) { val ssc = new StreamingContext("local[2]", "Data", Seconds(30)) ssc.checkpoint("checkpoint") val topicMap = Map("data" -> 1) val pipe = KafkaUtils.createStream(ssc, "Zookeeper_1,Zookeeper_2,Zookeeper_3", "Cons1", topicMap).map(_._2) val pageHitStream = pipe.map(data => { parse(data) }).map(json => { implicit val formats = DefaultFormats val SQLCalls = (json \ "SQLCalls").extractOpt[Double] val URL = (json \ "URL").extractOpt[String] val PageLoadTime = (json \ "PageLoadTime").extractOpt[Double] PageHit(SQLCalls.getOrElse(0.0), URL.getOrElse("Empty"), PageLoadTime.getOrElse(0.0)) }) def truncate(value: Double): Double = { return (value * 1000).round / 1000.toDouble } val windowStream = pageHitStream.filter(pageHit => pageHit.URL != "Empty").window(Minutes(1), Seconds(30)) windowStream.foreachRDD(pageHit => { if (pageHit.count > 0) { val SQLCallArray = pageHit.collect.map(pageHit => pageHit.SQLCalls) var avgNumberSQlCalls = 0.0 avgNumberSQlCalls = truncate((SQLCallArray.reduceLeft[Double](_ + _)) / SQLCallArray.length) var totalPageLoadTime = 0.0 val PageLoadTimeArray = pageHit.collect.map(pageHit => pageHit.PageLoadTime) val avgPageLoadTime = truncate(PageLoadTimeArray.reduceLeft[Double](_ + _) / PageLoadTimeArray.length) var URLArray = pageHit.collect.take(1).map(pageHit => pageHit.URL) val URL = URLArray(0) pageHit.foreachPartition(partition => { val factory = new ConnectionFactory() factory.setUsername("myUserName") factory.setPassword("myPassword") factory.setVirtualHost("/") factory.setHost("rabbits") factory.setPort(5672) val connection = factory.newConnection() val pageHitChannel = connection.createChannel() pageHitChannel.exchangeDeclare("PageHits", "fanout") val today = Calendar.getInstance().getTime() val currentSecondAsString = new SimpleDateFormat("ss") val secString = currentSecondAsString.format(today) implicit val formats = DefaultFormats val payload2 = RabbitPayload2(URL, avgNumberSQlCalls, avgPageLoadTime, SQLCallArray.length / 2, secString) pageHitChannel.basicPublish("PageHits", "", null, writePretty(payload2).getBytes) val mongoClient = MongoClient("mongodb") val db = mongoClient("Dev") val SQLCollection = db("SQLCalls") SQLCollection += MongoDBObject("Event" -> "Page Hit", "URL" -> URL, "Avg number of SQL Calls" -> avgNumberSQlCalls, "Avg Page Load Time" -> avgPageLoadTime, "Number of Page Hits in window" -> SQLCallArray.length / 2) }) } }) ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12322.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing to RabbitMQ
You are correct in that I am trying to publish inside of a foreachRDD loop. I am currently refactoring and will try publishing inside the foreachPartition loop. Below is the code showing the way it is currently written, thanks! object myData { def main(args: Array[String]) { val ssc = new StreamingContext("local[8]", "Data", Seconds(10)) ssc.checkpoint("checkpoint") val topicMap = Map("pagehit.data" -> 1) val factory = new ConnectionFactory() factory.setUsername("officialUsername") factory.setPassword("crypticPassword") factory.setVirtualHost("/") factory.setHost("rabbit-env") factory.setPort() val connection = factory.newConnection() val SQLChannel = connection.createChannel() SQLChannel.queueDeclare("SQLQueue", true, false, false, null) val Pipe = KafkaUtils.createStream(ssc, "Zookeeper_1,Zookeeper_1,Zookeeper_3", "Cons1", topicMap).map(_._2) //PARSE SOME JSON ETC windowStream.foreachRDD(pagehit => { val mongoClient = MongoClient("my-mongodb") val db = mongoClient("myClient") val SQLCollection = db("SQLCalls") val callArray = pagehit.map(_._1).collect val avg = (callArray.reduceLeft[Long](_+_))/callArray.length val URL = pagehit.take(1).map(_._2) SQLCollection += MongoDBObject("URL" -> URL(0).substring(7, URL(0).length - 1), "Avg Page Load Time" -> avg) val toBuildJSON = Seq(baseMsg, avg.toString, closingBrace) val byteArray = toBuildJSON.mkString.getBytes() SQLChannel.basicPublish("", "SQLQueue", null, byteArray) }) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p11445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Writing to RabbitMQ
I have been trying to write to RabbitMQ in my Spark Streaming app and I receive the below exception: java.io.NotSerializableException: com.rabbitmq.client.impl.ChannelN Does anyone have experience sending their data to rabbit? I am using the basicpublish call like so -> SQLChannel.basicPublish("", "SQLQueue", null, byteArray) I think I read earlier that Spark may be trying to serialize the data which cannot be done to a byte array which is what rabbit demands from what I have seen. When I run the Spark locally my data is actually being sent to the queue but I have a feeling it wouldn't work in production on a cluster. Below if the full trace of the exception, please let me know if anyone has an idea of what the problem is, thanks! 14/08/03 12:20:00 ERROR actor.OneForOneStrategy: com.rabbitmq.client.impl.ChannelN java.io.NotSerializableException: com.rabbitmq.client.impl.ChannelN at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorker
Re: Spark Streaming Error Help -> ERROR actor.OneForOneStrategy: key not found:
I think I have found my answers but if anyone has thoughts please share. After testing for a while I think the error doesn't have any effect on the process. I think it is the case that there must be elements left in the window from last run otherwise my system is completely whack. Please let me know if any of this looks incorrect, thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Error-Help-ERROR-actor-OneForOneStrategy-key-not-found-tp8746p8750.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming Error Help -> ERROR actor.OneForOneStrategy: key not found:
I am getting this "ERROR actor.OneForOneStrategy: key not found:" exception when I run my code and I'm not sure where it is looking for a key. My set up is I send packets to a third party service which then uses a webhook to hit one of our servers, which then logs it using kafka. I am just trying to total up the number of elements/packets that come in using ".countByWindow(Seconds(30), Seconds(15)).print()". I don't see anything in the docs about countByWindow demanding a key value pair. I still see counts as an output so I'm thinking the exception is hurting anything... Also I am confused about some of the counts. I start Spark up and it has been producing counts greater than zero when I haven't sent any data packets since the start of that run. Are there packets saved that didn't get pushed through from the last session? This seems odd, I would think the count would produce 0 if I haven't sent any data... I'll post the code and exception error trace below, thanks for any help! import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf object RunningPageTotal { def main(args: Array[String]) { val ssc = new StreamingContext("local[2]", "PageHitTotals", Seconds(5)) ssc.checkpoint("checkpoint") val topicMap = Map("User.Profile.Page.Hits" -> 1) val pageHitStream = KafkaUtils.createStream(ssc, "Zookeeper_1,Zookeeper_1,Zookeeper_3", "Cons1", topicMap).map(_._2) val totalHits = pageHitStream.countByWindow(Seconds(30), Seconds(15)).print() ssc.start() ssc.awaitTermination() } } 14/07/03 12:42:25 INFO dstream.MappedDStream: Time 1404409345000 ms is invalid as zeroTime is 1404409335000 ms and slideDuration is 15000 ms and difference is 1 ms 14/07/03 12:42:25 ERROR actor.OneForOneStrategy: key not found: 1404409345000 ms java.util.NoSuchElementException: key not found: 1404409345000 ms at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Error-Help-ERROR-actor-OneForOneStrategy-key-not-found-tp8746.html Sent from the Apache Spark User List mailing list archive at Nabble.com.