Bind exception while running FlumeEventCount
Hi, I have installed spark-1.1.0 and apache flume 1.4 for running streaming example FlumeEventCount. Previously the code was working fine. Now Iam facing with the below mentioned issues. My flume is running properly it is able to write the file. The command I use is bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 172.29.17.178 65001 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Error starting receiver 0: org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 14/11/07 23:19:23 INFO flume.FlumeReceiver: Flume receiver stopped 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 14/11/07 23:19:23 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Caused by: java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:344) at sun.nio.ch.Net.bind(Net.java:336) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:199) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) ... 3 more 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopping BlockGenerator 14/11/07 23:19:23 INFO util.RecurringTimer: Stopped timer for BlockGenerator after time 1415382563200 14/11/07 23:19:23 INFO receiver.BlockGenerator: Waiting for block pushing thread 14/11/07 23:19:23 INFO receiver.BlockGenerator: Pushing out the last 0 blocks 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopped block pushing thread 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopped BlockGenerator 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Waiting for executor stop is over 14/11/07 23:19:23 ERROR receiver.ReceiverSupervisorImpl: Stopped executor with error: org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 14/11/07 23:19:23 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171) at
Re: Replacing Spark's native scheduler with Sparrow
On Sun, Nov 9, 2014 at 1:51 AM, Tathagata Das tathagata.das1...@gmail.com wrote: This causes a scalability vs. latency tradeoff - if your limit is 1000 tasks per second (simplifying from 1500), you could either configure it to use 100 receivers at 100 ms batches (10 blocks/sec), or 1000 receivers at 1 second batches. This raises an interesting question, TD. Do we have a benchmark for Spark Streaming that tests it at the extreme for some key metric, perhaps processed messages per second per node? Something that would press Spark's ability to process tasks quickly enough. Given such a benchmark, it would probably be interesting to see how -- if at all -- Sparrow has an impact on Spark Streaming's performance. Nick
Re: Replacing Spark's native scheduler with Sparrow
Too bad Nick, I dont have anything immediately ready that tests Spark Streaming with those extreme settings. :) On Mon, Nov 10, 2014 at 9:56 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On Sun, Nov 9, 2014 at 1:51 AM, Tathagata Das tathagata.das1...@gmail.com wrote: This causes a scalability vs. latency tradeoff - if your limit is 1000 tasks per second (simplifying from 1500), you could either configure it to use 100 receivers at 100 ms batches (10 blocks/sec), or 1000 receivers at 1 second batches. This raises an interesting question, TD. Do we have a benchmark for Spark Streaming that tests it at the extreme for some key metric, perhaps processed messages per second per node? Something that would press Spark's ability to process tasks quickly enough. Given such a benchmark, it would probably be interesting to see how -- if at all -- Sparrow has an impact on Spark Streaming's performance. Nick - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
getting exception when trying to build spark from master
Getting an exception while trying to build spark in spark-core: [ERROR] while compiling: /Users/dev/tellapart_spark/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala during phase: typer library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: -deprecation -feature -classpath last tree to typer: Ident(enumDispatcher) symbol: value enumDispatcher (flags: triedcooking) symbol definition: val enumDispatcher: java.util.EnumSet[javax.servlet.DispatcherType] tpe: java.util.EnumSet[javax.servlet.DispatcherType] symbol owners: value enumDispatcher - value $anonfun - method addFilters - object JettyUtils - package ui context owners: value $anonfun - value $anonfun - method addFilters - object JettyUtils - package ui == Enclosing template or block == Block( ValDef( // val filters: Array[String] triedcooking filters AppliedTypeTree( Array String ) Apply( conf.get(spark.ui.filters, ).split(',').map Function( // val $anonfun: notype, tree.tpe=String = String ValDef( // x$1: String param synthetic triedcooking x$1 tpt // tree.tpe=String empty ) Apply( // def trim(): String in class String, tree.tpe=String x$1.trim // def trim(): String in class String, tree.tpe=()String Nil ) ) ) ) Apply( filters.foreach Match( empty CaseDef( Bind( // val filter: String filter Typed( _ // tree.tpe=String String ) ) If( filter.isEmpty.unary_$bang Block( // 7 statements Apply( logInfo Apply( // final def +(x$1: Any): String in class String, tree.tpe=String Adding filter: .$plus // final def +(x$1: Any): String in class String, tree.tpe=(x$1: Any)String filter // val filter: String, tree.tpe=String ) ) ValDef( // val holder: org.eclipse.jetty.servlet.FilterHolder triedcooking holder FilterHolder Apply( new FilterHolder.init Nil ) ) Apply( // def setClassName(x$1: String): Unit in class Holder, tree.tpe=Unit holder.setClassName // def setClassName(x$1: String): Unit in class Holder, tree.tpe=(x$1: String)Unit filter // val filter: String, tree.tpe=String ) Apply( conf.get(spark..+(filter).+(.params), ).split(',').map(((x$2: String) = x$2.trim())).toSet.foreach Function( // val $anonfun: notype ValDef( // param: String param triedcooking param String empty ) If( param.isEmpty.unary_$bang Block( ValDef( // val parts: Array[String] triedcooking parts tpt // tree.tpe=Array[String] Apply( // def split(x$1: String): Array[String] in class String, tree.tpe=Array[String] param.split // def split(x$1: String): Array[String] in class String, tree.tpe=(x$1: String)Array[String] = ) ) If( Apply( // def ==(x: Int): Boolean in class Int, tree.tpe=Boolean parts.length.$eq$eq // def ==(x: Int): Boolean in class Int, tree.tpe=(x: Int)Boolean 2 ) Apply( // def setInitParameter(x$1: String,x$2: String): Unit in class Holder holder.setInitParameter // def setInitParameter(x$1: String,x$2: String): Unit in class Holder, tree.tpe=(x$1: String, x$2: String)Unit // 2 arguments Apply( // val parts: Array[String] parts // val parts: Array[String], tree.tpe=parts.type 0 ) Apply( // val parts: Array[String] parts // val parts: Array[String], tree.tpe=parts.type 1 ) ) () ) ) () ) ) ) ValDef( // val prefix: String triedcooking prefix tpt // tree.tpe=String Apply( StringContext(spark., .param.).s
Re: getting exception when trying to build spark from master
It looks like the Jenkins maven builds are broken, too. Based on the Jenkins logs, I think that this pull request may have broken things (although I'm not sure why): https://github.com/apache/spark/pull/3030#issuecomment-62436181 On Mon, Nov 10, 2014 at 1:42 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Getting an exception while trying to build spark in spark-core: [ERROR] while compiling: /Users/dev/tellapart_spark/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala during phase: typer library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: -deprecation -feature -classpath last tree to typer: Ident(enumDispatcher) symbol: value enumDispatcher (flags: triedcooking) symbol definition: val enumDispatcher: java.util.EnumSet[javax.servlet.DispatcherType] tpe: java.util.EnumSet[javax.servlet.DispatcherType] symbol owners: value enumDispatcher - value $anonfun - method addFilters - object JettyUtils - package ui context owners: value $anonfun - value $anonfun - method addFilters - object JettyUtils - package ui == Enclosing template or block == Block( ValDef( // val filters: Array[String] triedcooking filters AppliedTypeTree( Array String ) Apply( conf.get(spark.ui.filters, ).split(',').map Function( // val $anonfun: notype, tree.tpe=String = String ValDef( // x$1: String param synthetic triedcooking x$1 tpt // tree.tpe=String empty ) Apply( // def trim(): String in class String, tree.tpe=String x$1.trim // def trim(): String in class String, tree.tpe=()String Nil ) ) ) ) Apply( filters.foreach Match( empty CaseDef( Bind( // val filter: String filter Typed( _ // tree.tpe=String String ) ) If( filter.isEmpty.unary_$bang Block( // 7 statements Apply( logInfo Apply( // final def +(x$1: Any): String in class String, tree.tpe=String Adding filter: .$plus // final def +(x$1: Any): String in class String, tree.tpe=(x$1: Any)String filter // val filter: String, tree.tpe=String ) ) ValDef( // val holder: org.eclipse.jetty.servlet.FilterHolder triedcooking holder FilterHolder Apply( new FilterHolder.init Nil ) ) Apply( // def setClassName(x$1: String): Unit in class Holder, tree.tpe=Unit holder.setClassName // def setClassName(x$1: String): Unit in class Holder, tree.tpe=(x$1: String)Unit filter // val filter: String, tree.tpe=String ) Apply( conf.get(spark..+(filter).+(.params), ).split(',').map(((x$2: String) = x$2.trim())).toSet.foreach Function( // val $anonfun: notype ValDef( // param: String param triedcooking param String empty ) If( param.isEmpty.unary_$bang Block( ValDef( // val parts: Array[String] triedcooking parts tpt // tree.tpe=Array[String] Apply( // def split(x$1: String): Array[String] in class String, tree.tpe=Array[String] param.split // def split(x$1: String): Array[String] in class String, tree.tpe=(x$1: String)Array[String] = ) ) If( Apply( // def ==(x: Int): Boolean in class Int, tree.tpe=Boolean parts.length.$eq$eq // def ==(x: Int): Boolean in class Int, tree.tpe=(x: Int)Boolean 2 ) Apply( // def setInitParameter(x$1: String,x$2: String): Unit in class Holder holder.setInitParameter // def setInitParameter(x$1: String,x$2: String): Unit in class Holder, tree.tpe=(x$1: String, x$2: String)Unit // 2 arguments Apply( // val parts: Array[String] parts // val parts: Array[String], tree.tpe=parts.type 0 ) Apply( // val parts: Array[String] parts // val parts: Array[String], tree.tpe=parts.type
Http client dependency conflict when using AWS
I'm wondering why https://issues.apache.org/jira/browse/SPARK-3638 only updated the version of http client for the kinesis-asl profile and left the base dependencies unchanged. Spark built without that profile still has the same java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V when using aws components in general, not just kinesis (e.g. AmazonCloudWatchAsyncClient) Re-Reading the Dependency Hell in Spark applications thread from september didn't shed any light on the subject. As noted in that thread, userClassPathFirst doesn't help. Is there a reason not to depend on an updated version of httpclient for all spark builds, as opposed to just kinesis-asl?
Spark 1.1.1 release
Hi everyone, I am the release manager for 1.1.1, and I am preparing to cut a release tonight at midnight. 1.1.1 is a maintenance release which will ship several important bug fixes to users of Spark 1.1. Many users are waiting for these fixes so I would like to release it as soon as possible. At this point, I believe we have already back ported all critical fixes from master other than a few known ones. Below is a list of issues that have been back ported into 1.1.1. If there are other critical fixes in the master branch that are not in this list, please let me know and I will take a look. https://issues.apache.org/jira/browse/SPARK-3653?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%201.1.1%20AND%20fixVersion%20%3D%201.2.0 Best, - Andrew
Re: Bind exception while running FlumeEventCount
Looks like that port is not available because another app is using that port. Can you take a look at netstat -a and use a port that is free? Thanks, Hari On Fri, Nov 7, 2014 at 2:05 PM, Jeniba Johnson jeniba.john...@lntinfotech.com wrote: Hi, I have installed spark-1.1.0 and apache flume 1.4 for running streaming example FlumeEventCount. Previously the code was working fine. Now Iam facing with the below mentioned issues. My flume is running properly it is able to write the file. The command I use is bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 172.29.17.178 65001 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Error starting receiver 0: org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 14/11/07 23:19:23 INFO flume.FlumeReceiver: Flume receiver stopped 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 14/11/07 23:19:23 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Caused by: java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:344) at sun.nio.ch.Net.bind(Net.java:336) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:199) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) ... 3 more 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopping BlockGenerator 14/11/07 23:19:23 INFO util.RecurringTimer: Stopped timer for BlockGenerator after time 1415382563200 14/11/07 23:19:23 INFO receiver.BlockGenerator: Waiting for block pushing thread 14/11/07 23:19:23 INFO receiver.BlockGenerator: Pushing out the last 0 blocks 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopped block pushing thread 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopped BlockGenerator 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Waiting for executor stop is over 14/11/07 23:19:23 ERROR receiver.ReceiverSupervisorImpl: Stopped executor with error: org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 14/11/07 23:19:23 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at
Re: getting exception when trying to build spark from master
I reverted the patch locally, seems to be working for me. On Mon, Nov 10, 2014 at 6:00 PM, Patrick Wendell pwend...@gmail.com wrote: I reverted that patch to see if it fixes it. On Mon, Nov 10, 2014 at 1:45 PM, Josh Rosen rosenvi...@gmail.com wrote: It looks like the Jenkins maven builds are broken, too. Based on the Jenkins logs, I think that this pull request may have broken things (although I'm not sure why): https://github.com/apache/spark/pull/3030#issuecomment-62436181 On Mon, Nov 10, 2014 at 1:42 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Getting an exception while trying to build spark in spark-core: [ERROR] while compiling: /Users/dev/tellapart_spark/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala during phase: typer library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: -deprecation -feature -classpath last tree to typer: Ident(enumDispatcher) symbol: value enumDispatcher (flags: triedcooking) symbol definition: val enumDispatcher: java.util.EnumSet[javax.servlet.DispatcherType] tpe: java.util.EnumSet[javax.servlet.DispatcherType] symbol owners: value enumDispatcher - value $anonfun - method addFilters - object JettyUtils - package ui context owners: value $anonfun - value $anonfun - method addFilters - object JettyUtils - package ui == Enclosing template or block == Block( ValDef( // val filters: Array[String] triedcooking filters AppliedTypeTree( Array String ) Apply( conf.get(spark.ui.filters, ).split(',').map Function( // val $anonfun: notype, tree.tpe=String = String ValDef( // x$1: String param synthetic triedcooking x$1 tpt // tree.tpe=String empty ) Apply( // def trim(): String in class String, tree.tpe=String x$1.trim // def trim(): String in class String, tree.tpe=()String Nil ) ) ) ) Apply( filters.foreach Match( empty CaseDef( Bind( // val filter: String filter Typed( _ // tree.tpe=String String ) ) If( filter.isEmpty.unary_$bang Block( // 7 statements Apply( logInfo Apply( // final def +(x$1: Any): String in class String, tree.tpe=String Adding filter: .$plus // final def +(x$1: Any): String in class String, tree.tpe=(x$1: Any)String filter // val filter: String, tree.tpe=String ) ) ValDef( // val holder: org.eclipse.jetty.servlet.FilterHolder triedcooking holder FilterHolder Apply( new FilterHolder.init Nil ) ) Apply( // def setClassName(x$1: String): Unit in class Holder, tree.tpe=Unit holder.setClassName // def setClassName(x$1: String): Unit in class Holder, tree.tpe=(x$1: String)Unit filter // val filter: String, tree.tpe=String ) Apply( conf.get(spark..+(filter).+(.params), ).split(',').map(((x$2: String) = x$2.trim())).toSet.foreach Function( // val $anonfun: notype ValDef( // param: String param triedcooking param String empty ) If( param.isEmpty.unary_$bang Block( ValDef( // val parts: Array[String] triedcooking parts tpt // tree.tpe=Array[String] Apply( // def split(x$1: String): Array[String] in class String, tree.tpe=Array[String] param.split // def split(x$1: String): Array[String] in class String, tree.tpe=(x$1: String)Array[String] = ) ) If( Apply( // def ==(x: Int): Boolean in class Int, tree.tpe=Boolean parts.length.$eq$eq // def ==(x: Int): Boolean in class Int, tree.tpe=(x: Int)Boolean 2 ) Apply( // def setInitParameter(x$1: String,x$2: String): Unit in class Holder holder.setInitParameter // def
Re: MatrixFactorizationModel predict(Int, Int) API
I tested 2 different implementations to generate the predicted ranked list...The first version uses a cartesian of user and product features and then generates a predicted value for each (user,product) key... The second version does a collect on the skinny matrix (most likely products) and then broadcasts it to every node which computes the predicted value... cartesian is slower than the broadcast version...but in the broadcast version also the shuffle time is significant..Bottleneck is the groupBy on (user,product) composite key followed by local sort to generate topK... The third version I thought of was to use topK predict API but this works only if topK is bounded by a small number...If topK is large (say 100K) it does not work since then it is bounded by master memory... The block-wise cross product idea will optimize the groupBy right ? we break user and feature matrices into blocks (re-use ALS partitioning) and then in place of using (user,product) as a key use (userBlock, productBlock) as key...Does this help improve in shuffle size ? On Thu, Nov 6, 2014 at 5:07 PM, Xiangrui Meng men...@gmail.com wrote: There is a JIRA for it: https://issues.apache.org/jira/browse/SPARK-3066 The easiest case is when one side is small. If both sides are large, this is a super-expensive operation. We can do block-wise cross product and then find top-k for each user. Best, Xiangrui On Thu, Nov 6, 2014 at 4:51 PM, Debasish Das debasish.da...@gmail.com wrote: model.recommendProducts can only be called from the master then ? I have a set of 20% users on whom I am performing the test...the 20% users are in a RDD...if I have to collect them all to master node and then call model.recommendProducts, that's a issue... Any idea how to optimize this so that we can calculate MAP statistics on large samples of data ? On Thu, Nov 6, 2014 at 4:41 PM, Xiangrui Meng men...@gmail.com wrote: ALS model contains RDDs. So you cannot put `model.recommendProducts` inside a RDD closure `userProductsRDD.map`. -Xiangrui On Thu, Nov 6, 2014 at 4:39 PM, Debasish Das debasish.da...@gmail.com wrote: I reproduced the problem in mllib tests ALSSuite.scala using the following functions: val arrayPredict = userProductsRDD.map{case(user,product) = val recommendedProducts = model.recommendProducts(user, products) val productScore = recommendedProducts.find{x=x.product == product} require(productScore != None) productScore.get }.collect arrayPredict.foreach { elem = if (allRatings.get(elem.user, elem.product) != elem.rating) fail(Prediction APIs don't match) } If the usage of model.recommendProducts is correct, the test fails with the same error I sent before... org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 316.0 failed 1 times, most recent failure: Lost task 0.0 in stage 316.0 (TID 79, localhost): scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:825) org.apache.spark.mllib.recommendation.MatrixFactorizationModel.recommendProducts(MatrixFactorizationModel.scala:81) It is a blocker for me and I am debugging it. I will open up a JIRA if this is indeed a bug... Do I have to cache the models to make userFeatures.lookup(user).head to work ? On Mon, Nov 3, 2014 at 9:24 PM, Xiangrui Meng men...@gmail.com wrote: Was user presented in training? We can put a check there and return NaN if the user is not included in the model. -Xiangrui On Mon, Nov 3, 2014 at 5:25 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am testing MatrixFactorizationModel.predict(user: Int, product: Int) but the code fails on userFeatures.lookup(user).head In computeRmse MatrixFactorizationModel.predict(RDD[(Int, Int)]) has been called and in all the test-cases that API has been used... I can perhaps refactor my code to do the same but I was wondering whether people test the lookup(user) version of the code.. Do I need to cache the model to make it work ? I think right now default is STORAGE_AND_DISK... Thanks. Deb
thrift jdbc server probably running queries as hive query
I was testing out the spark thrift jdbc server by running a simple query in the beeline client. The spark itself is running on a yarn cluster. However, when I run a query in beeline - I see no running jobs in the spark UI(completely empty) and the yarn UI seem to indicate that the submitted query is being run as a map reduce job. This is probably also being indicated from the spark logs but I am not completely sure: 2014-11-11 00:19:00,492 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1 2014-11-11 00:19:00,877 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,152 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,425 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication is deprecated. Instead, use mapreduce.client.submit.file.replication 2014-11-11 00:19:04,516 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,607 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,639 WARN mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this 2014-11-11 00:00:08,806 INFO input.FileInputFormat (FileInputFormat.java:listStatus(287)) - Total input paths to process : 14912 2014-11-11 00:00:08,864 INFO lzo.GPLNativeCodeLoader (GPLNativeCodeLoader.java:clinit(34)) - Loaded native gpl library 2014-11-11 00:00:08,866 INFO lzo.LzoCodec (LzoCodec.java:clinit(76)) - Successfully loaded initialized native-lzo library [hadoop-lzo rev 8e266e052e423af592871e2dfe09d54c03f6a0e8] 2014-11-11 00:00:09,873 INFO input.CombineFileInputFormat (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 194541317 2014-11-11 00:00:10,017 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:615 2014-11-11 00:00:10,095 INFO mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1414084656759_0115 2014-11-11 00:00:10,241 INFO impl.YarnClientImpl (YarnClientImpl.java:submitApplication(167)) - Submitted application application_1414084656759_0115 It seems like the query is being run as a hive query instead of spark query. The same query works fine when run from spark-sql cli.
Re: thrift jdbc server probably running queries as hive query
The sql run successfully? and what sql you running? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/thrift-jdbc-server-probably-running-queries-as-hive-query-tp9267p9268.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Checkpoint bugs in GraphX
Hi, all. I'm not sure whether someone has reported this bug: There should be a checkpoint() method in EdgeRDD and VertexRDD as follows: override def checkpoint(): Unit = { partitionsRDD.checkpoint() } Current EdgeRDD and VertexRDD use *RDD.checkpoint()*, which only checkpoint the edges/vertices but not the critical partitionsRDD. Also, the variables (partitionsRDD and targetStroageLevel) in EdgeRDD and VertexRDD should be transient. class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( @transient val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], @transient val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { class VertexRDD[@specialized VD: ClassTag]( @transient val partitionsRDD: RDD[ShippableVertexPartition[VD]], @transient val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { These two bugs usually lead to stackoverflow error in iterative application written by GraphX.
Re: thrift jdbc server probably running queries as hive query
Hey Sadhan, I really don't think this is Spark log... Unlike Shark, Spark SQL doesn't even provide a Hive mode to let you execute queries against Hive. Would you please check whether there is an existing HiveServer2 running there? Spark SQL HiveThriftServer2 is just a Spark port of HiveServer2, and they share the same default listening port. I guess the Thrift server didn't start successfully because the HiveServer2 occupied the port, and your Beeline session was probably linked against HiveServer2. Cheng On 11/11/14 8:29 AM, Sadhan Sood wrote: I was testing out the spark thrift jdbc server by running a simple query in the beeline client. The spark itself is running on a yarn cluster. However, when I run a query in beeline - I see no running jobs in the spark UI(completely empty) and the yarn UI seem to indicate that the submitted query is being run as a map reduce job. This is probably also being indicated from the spark logs but I am not completely sure: 2014-11-11 00:19:00,492 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1 2014-11-11 00:19:00,877 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,152 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,425 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication is deprecated. Instead, use mapreduce.client.submit.file.replication 2014-11-11 00:19:04,516 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,607 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,639 WARN mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this 2014-11-11 00:00:08,806 INFO input.FileInputFormat (FileInputFormat.java:listStatus(287)) - Total input paths to process : 14912 2014-11-11 00:00:08,864 INFO lzo.GPLNativeCodeLoader (GPLNativeCodeLoader.java:clinit(34)) - Loaded native gpl library 2014-11-11 00:00:08,866 INFO lzo.LzoCodec (LzoCodec.java:clinit(76)) - Successfully loaded initialized native-lzo library [hadoop-lzo rev 8e266e052e423af592871e2dfe09d54c03f6a0e8] 2014-11-11 00:00:09,873 INFO input.CombineFileInputFormat (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 194541317 2014-11-11 00:00:10,017 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:615 2014-11-11 00:00:10,095 INFO mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1414084656759_0115 2014-11-11 00:00:10,241 INFO impl.YarnClientImpl (YarnClientImpl.java:submitApplication(167)) - Submitted application application_1414084656759_0115 It seems like the query is being run as a hive query instead of spark query. The same query works fine when run from spark-sql cli.
Re: Checkpoint bugs in GraphX
I have been trying to fix this bug. The related PR: https://github.com/apache/spark/pull/2631 -- Original -- From: Xu Lijie;lijie@gmail.com; Date: Tue, Nov 11, 2014 10:19 AM To: useru...@spark.apache.org; devdev@spark.apache.org; Subject: Checkpoint bugs in GraphX Hi, all. I'm not sure whether someone has reported this bug: There should be a checkpoint() method in EdgeRDD and VertexRDD as follows: override def checkpoint(): Unit = { partitionsRDD.checkpoint() } Current EdgeRDD and VertexRDD use *RDD.checkpoint()*, which only checkpoint the edges/vertices but not the critical partitionsRDD. Also, the variables (partitionsRDD and targetStroageLevel) in EdgeRDD and VertexRDD should be transient. class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( @transient val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], @transient val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { class VertexRDD[@specialized VD: ClassTag]( @transient val partitionsRDD: RDD[ShippableVertexPartition[VD]], @transient val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { These two bugs usually lead to stackoverflow error in iterative application written by GraphX.
Discuss how to do checkpoint more efficently
Hi, all. I want to seek suggestions on how to do checkpoint more efficiently, especially for iterative applications written by GraphX. For iterative applications, the lineage of a job can be very long, which is easy to cause statckoverflow error. A solution is to do checkpoint. However, checkpoint is time-consuming and not easy for ordinary users to perform (e.g., which RDDs need checkpoint and when to checkpoint them). Moreover, to shorten the linage, iterative applications need to do checkpoint frequently (e.g., every 10 iterations). As a result, checkpoint is too heavy for iterative applications especially written by GraphX. I'm wondering if there is an elegant way to solve the problem: shortening the lineage and also saving the intermediate data/results in a lightweight way. Maybe we can develop a new API like checkpoint(StorageLevel), which has the feature of both cache() and current checkpoint(). Examples: The lineage is very long without checkpoint even in the first iteration in GraphX job. [Iter 1][DEBUG] (2) EdgeRDD[33] at RDD at EdgeRDD.scala:35 | EdgeRDD ZippedPartitionsRDD2[32] at zipPartitions at ReplicatedVertexView.scala:114 | EdgeRDD MapPartitionsRDD[12] at mapPartitionsWithIndex at EdgeRDD.scala:169 | MappedRDD[11] at map at Graph.scala:392 | MappedRDD[10] at distinct at KCoreCommonDebug.scala:115 | ShuffledRDD[9] at distinct at KCoreCommonDebug.scala:115 +-(2) MappedRDD[8] at distinct at KCoreCommonDebug.scala:115 | FilteredRDD[7] at filter at KCoreCommonDebug.scala:112 | MappedRDD[6] at map at KCoreCommonDebug.scala:102 | MappedRDD[5] at repartition at KCoreCommonDebug.scala:101 | CoalescedRDD[4] at repartition at KCoreCommonDebug.scala:101 | ShuffledRDD[3] at repartition at KCoreCommonDebug.scala:101 +-(2) MapPartitionsRDD[2] at repartition at KCoreCommonDebug.scala:101 | D:\graphData\verylarge.txt MappedRDD[1] at textFile at KCoreCommonDebug.scala:100 | D:\graphData\verylarge.txt HadoopRDD[0] at textFile at KCoreCommonDebug.scala:100 | ShuffledRDD[31] at partitionBy at ReplicatedVertexView.scala:112 +-(2) ReplicatedVertexView.updateVertices - shippedVerts false false (broadcast) MapPartitionsRDD[30] at mapPartitions at VertexRDD.scala:347 | VertexRDD ZippedPartitionsRDD2[28] at zipPartitions at VertexRDD.scala:174 | VertexRDD, VertexRDD MapPartitionsRDD[18] at mapPartitions at VertexRDD.scala:441 | MapPartitionsRDD[17] at mapPartitions at VertexRDD.scala:457 | ShuffledRDD[16] at ShuffledRDD at RoutingTablePartition.scala:36 +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[15] at mapPartitions at VertexRDD.scala:452 | EdgeRDD MapPartitionsRDD[12] at mapPartitionsWithIndex at EdgeRDD.scala:169 | MappedRDD[11] at map at Graph.scala:392 | MappedRDD[10] at distinct at KCoreCommonDebug.scala:115 | ShuffledRDD[9] at distinct at KCoreCommonDebug.scala:115 +-(2) MappedRDD[8] at distinct at KCoreCommonDebug.scala:115 | FilteredRDD[7] at filter at KCoreCommonDebug.scala:112 | MappedRDD[6] at map at KCoreCommonDebug.scala:102 | MappedRDD[5] at repartition at KCoreCommonDebug.scala:101 | CoalescedRDD[4] at repartition at KCoreCommonDebug.scala:101 | ShuffledRDD[3] at repartition at KCoreCommonDebug.scala:101 +-(2) MapPartitionsRDD[2] at repartition at KCoreCommonDebug.scala:101 | D:\graphData\verylarge.txt MappedRDD[1] at textFile at KCoreCommonDebug.scala:100 | D:\graphData\verylarge.txt HadoopRDD[0] at textFile at KCoreCommonDebug.scala:100 | VertexRDD ZippedPartitionsRDD2[26] at zipPartitions at VertexRDD.scala:200 | VertexRDD, VertexRDD MapPartitionsRDD[18] at mapPartitions at VertexRDD.scala:441 | MapPartitionsRDD[17] at mapPartitions at VertexRDD.scala:457 | ShuffledRDD[16] at ShuffledRDD at RoutingTablePartition.scala:36 +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[15] at mapPartitions at VertexRDD.scala:452 | EdgeRDD MapPartitionsRDD[12] at mapPartitionsWithIndex at EdgeRDD.scala:169 | MappedRDD[11] at map at Graph.scala:392 | MappedRDD[10] at distinct at KCoreCommonDebug.scala:115 | ShuffledRDD[9] at distinct at KCoreCommonDebug.scala:115 +-(2) MappedRDD[8] at distinct at KCoreCommonDebug.scala:115 | FilteredRDD[7] at filter at KCoreCommonDebug.scala:112 | MappedRDD[6] at map at KCoreCommonDebug.scala:102 | MappedRDD[5] at repartition at KCoreCommonDebug.scala:101 | CoalescedRDD[4] at repartition at KCoreCommonDebug.scala:101 | ShuffledRDD[3] at repartition at KCoreCommonDebug.scala:101 +-(2) MapPartitionsRDD[2] at repartition at KCoreCommonDebug.scala:101 | D:\graphData\verylarge.txt MappedRDD[1] at textFile at KCoreCommonDebug.scala:100 |
Re: Checkpoint bugs in GraphX
Many methods are not required serialization EdgeRDD or VertexRDD(eg: graph.edges.count), moreover , partitionsRDD(or targetStorageLevel) need only in the driver. partitionsRDD (or targetStorageLevel) is not serialized no effect. -- Original -- From: Xu Lijie;lijie@gmail.com; Date: Tue, Nov 11, 2014 11:40 AM To: GuoQiang Liwi...@qq.com; Cc: useru...@spark.apache.org; devdev@spark.apache.org; Subject: Re: Checkpoint bugs in GraphX Nice, we currently encounter a stackoverflow error caused by this bug. We also found that val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) will not be serialized even without adding @transient. However, transient can affect the JVM stack. Our guess is that: If we do not add @transient, the pointers of partitionsRDD and targetStorageLevel will be kept in the stack. Or else, the stack will not keep any information of the two variables during serialization/deserialization. I'm wondering whether the guess is right. 2014-11-11 11:16 GMT+08:00 GuoQiang Li wi...@qq.com: I have been trying to fix this bug. The related PR: https://github.com/apache/spark/pull/2631 -- Original -- *From: * Xu Lijie;lijie@gmail.com; *Date: * Tue, Nov 11, 2014 10:19 AM *To: * useru...@spark.apache.org; devdev@spark.apache.org; *Subject: * Checkpoint bugs in GraphX Hi, all. I'm not sure whether someone has reported this bug: There should be a checkpoint() method in EdgeRDD and VertexRDD as follows: override def checkpoint(): Unit = { partitionsRDD.checkpoint() } Current EdgeRDD and VertexRDD use *RDD.checkpoint()*, which only checkpoint the edges/vertices but not the critical partitionsRDD. Also, the variables (partitionsRDD and targetStroageLevel) in EdgeRDD and VertexRDD should be transient. class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( @transient val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], @transient val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { class VertexRDD[@specialized VD: ClassTag]( @transient val partitionsRDD: RDD[ShippableVertexPartition[VD]], @transient val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { These two bugs usually lead to stackoverflow error in iterative application written by GraphX.
RE: Bind exception while running FlumeEventCount
Hi Hari Just to give you a background , I had installed spark-1.1.0 and apache flume 1.4 with basic configurations as needed. I just wanted to know that Is this the correct way for running Spark streaming examples with Flume. So As you had mentioned about the TIME_WAIT parameter, did not get exactly.. Iam attaching the screenshot ,so that you can help me with it The screenshot specify the ports listening after the program is executed Regards, Jeniba Johnson -Original Message- From: Hari Shreedharan [mailto:hshreedha...@cloudera.com] Sent: Tuesday, November 11, 2014 11:04 AM To: Jeniba Johnson Cc: dev@spark.apache.org Subject: RE: Bind exception while running FlumeEventCount The socket may have been in TIME_WAIT. Can you try after a bit? The error message definitely suggests that some other app is listening on that port. Thanks, Hari On Mon, Nov 10, 2014 at 9:30 PM, Jeniba Johnson jeniba.john...@lntinfotech.com wrote: Hi Hari Thanks for your kind reply Even after killing the process id of the specific port. Still Iam facing with the similar error. The command I use is sudo lsof -i -P | grep -i listen Kill -9 PID However If I try to work with the port which is available, still the error remains the same. Regards, Jeniba Johnson From: Hari Shreedharan [mailto:hshreedha...@cloudera.com] Sent: Tuesday, November 11, 2014 4:41 AM To: Jeniba Johnson Cc: dev@spark.apache.org Subject: Re: Bind exception while running FlumeEventCount Looks like that port is not available because another app is using that port. Can you take a look at netstat -a and use a port that is free? Thanks, Hari On Fri, Nov 7, 2014 at 2:05 PM, Jeniba Johnson jeniba.john...@lntinfotech.commailto:jeniba.john...@lntinfotech.com wrote: Hi, I have installed spark-1.1.0 and apache flume 1.4 for running streaming example FlumeEventCount. Previously the code was working fine. Now Iam facing with the below mentioned issues. My flume is running properly it is able to write the file. The command I use is bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 172.29.17.178 65001 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Error starting receiver 0: org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 14/11/07 23:19:23 INFO flume.FlumeReceiver: Flume receiver stopped 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 14/11/07 23:19:23 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:27 2) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDS tream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStre am.scala:171) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(R eceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverS upervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$ $anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$ $anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sca la:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sca la:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j ava:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor. java:615) at java.lang.Thread.run(Thread.java:722) Caused by: java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:344) at sun.nio.ch.Net.bind(Net.java:336) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:1 99) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioS erverBoss.java:193) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueu e(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNio Selector.java:290) at
RE: Bind exception while running FlumeEventCount
First, can you try a different port? TIME_WAIT is basically a timeout for a socket to be completely decommissioned for the port to be available for binding. Once you wait for a few minutes and if you still see a startup issue, can you also send the error logs? From what I can see, the port seems to be in use. Thanks, Hari
Re: Bind exception while running FlumeEventCount
Did you start a Flume agent to push data to the relevant port? Thanks, Hari On Fri, Nov 7, 2014 at 2:05 PM, Jeniba Johnson jeniba.john...@lntinfotech.com wrote: Hi, I have installed spark-1.1.0 and apache flume 1.4 for running streaming example FlumeEventCount. Previously the code was working fine. Now Iam facing with the below mentioned issues. My flume is running properly it is able to write the file. The command I use is bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 172.29.17.178 65001 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Error starting receiver 0: org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 14/11/07 23:19:23 INFO flume.FlumeReceiver: Flume receiver stopped 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 14/11/07 23:19:23 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Caused by: java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:344) at sun.nio.ch.Net.bind(Net.java:336) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:199) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) ... 3 more 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopping BlockGenerator 14/11/07 23:19:23 INFO util.RecurringTimer: Stopped timer for BlockGenerator after time 1415382563200 14/11/07 23:19:23 INFO receiver.BlockGenerator: Waiting for block pushing thread 14/11/07 23:19:23 INFO receiver.BlockGenerator: Pushing out the last 0 blocks 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopped block pushing thread 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopped BlockGenerator 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Waiting for executor stop is over 14/11/07 23:19:23 ERROR receiver.ReceiverSupervisorImpl: Stopped executor with error: org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 14/11/07 23:19:23 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at
RE: Bind exception while running FlumeEventCount
Hi Hari Meanwhile Iam trying out with different port. I need to confirm with you about the installation for Spark and Flume. For installation, I have just unzipped spark-1.1.0-bin-hadoop1.tar.gz and apache-flume-1.4.0-bin.tar.gz for running spark streaming examples. Is this the correct way or else Is there any other way, then just let me know. Awaiting for your kind reply. Regards, Jeniba Johnson From: Hari Shreedharan [mailto:hshreedha...@cloudera.com] Sent: Tuesday, November 11, 2014 12:41 PM To: Jeniba Johnson Cc: dev@spark.apache.org Subject: RE: Bind exception while running FlumeEventCount First, can you try a different port? TIME_WAIT is basically a timeout for a socket to be completely decommissioned for the port to be available for binding. Once you wait for a few minutes and if you still see a startup issue, can you also send the error logs? From what I can see, the port seems to be in use. Thanks, Hari On Mon, Nov 10, 2014 at 11:07 PM, Jeniba Johnson jeniba.john...@lntinfotech.commailto:jeniba.john...@lntinfotech.com wrote: Hi Hari Just to give you a background , I had installed spark-1.1.0 and apache flume 1.4 with basic configurations as needed. I just wanted to know that Is this the correct way for running Spark streaming examples with Flume. So As you had mentioned about the TIME_WAIT parameter, did not get exactly.. Iam attaching the screenshot ,so that you can help me with it The screenshot specify the ports listening after the program is executed Regards, Jeniba Johnson -Original Message- From: Hari Shreedharan [mailto:hshreedha...@cloudera.com] Sent: Tuesday, November 11, 2014 11:04 AM To: Jeniba Johnson Cc: dev@spark.apache.orgmailto:dev@spark.apache.org Subject: RE: Bind exception while running FlumeEventCount The socket may have been in TIME_WAIT. Can you try after a bit? The error message definitely suggests that some other app is listening on that port. Thanks, Hari On Mon, Nov 10, 2014 at 9:30 PM, Jeniba Johnson jeniba.john...@lntinfotech.commailto:jeniba.john...@lntinfotech.com wrote: Hi Hari Thanks for your kind reply Even after killing the process id of the specific port. Still Iam facing with the similar error. The command I use is sudo lsof -i -P | grep -i listen Kill -9 PID However If I try to work with the port which is available, still the error remains the same. Regards, Jeniba Johnson From: Hari Shreedharan [mailto:hshreedha...@cloudera.com] Sent: Tuesday, November 11, 2014 4:41 AM To: Jeniba Johnson Cc: dev@spark.apache.orgmailto:dev@spark.apache.org Subject: Re: Bind exception while running FlumeEventCount Looks like that port is not available because another app is using that port. Can you take a look at netstat -a and use a port that is free? Thanks, Hari On Fri, Nov 7, 2014 at 2:05 PM, Jeniba Johnson jeniba.john...@lntinfotech.commailto:jeniba.john...@lntinfotech.commailto:jeniba.john...@lntinfotech.com%3cmailto:jeniba.john...@lntinfotech.com wrote: Hi, I have installed spark-1.1.0 and apache flume 1.4 for running streaming example FlumeEventCount. Previously the code was working fine. Now Iam facing with the below mentioned issues. My flume is running properly it is able to write the file. The command I use is bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 172.29.17.178 65001 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Error starting receiver 0: org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 14/11/07 23:19:23 INFO flume.FlumeReceiver: Flume receiver stopped 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 14/11/07 23:19:23 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:27 2) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDS tream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStre am.scala:171) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(R eceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverS upervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$ $anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$ $anonfun$9.apply(ReceiverTracker.scala:257) at