You probably have only 10 cores in your cluster on which you are executing your job. since each dstream / receiver take one core each, the system is not able to start all of them and so everything is blocked.
On Wed, Aug 6, 2014 at 3:08 AM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid> wrote: > Hi, > > I am reading multiple streams from multiple ports with a single streaming > context. I have created array of Dstream. This works until 10 streams. But > if I go over that ( i have checked with 15 and 20 streams), spark streaming > task stucks and is taking time. I waited for 10 minutes(2.2 min in the > attached screenshot), still not going through. The attached streaming UI > shows where it stucks. > > If this is not the right way to read multiple streams, what else is the > alternative?? I dont want to union the streams. I want to read them > simultaneously in parallel. > > object StreamAnomalyDetector { > > def calculate(sumOfSquare: Double, sumOfN: Double, n: Int):(Int,( Double, > Double, Double)) ={ > > val mean = sumOfN/n > val varience = sumOfSquare/n - math.pow(mean,2) > return (n, (mean, varience, math.sqrt(varience))) > } > > def main(args: Array[String]) { > if (args.length < 3) { > System.err.println("Usage: StreamAnomalyDetector <master> <hostname> > <port>") > System.exit(1) > } > //Setting systen properties > //System.setProperty("spark.cores.max", "3") > System.setProperty("spark.executor.memory", "5g") > > // Create the context > val ssc = new StreamingContext(args(0), "StreamAnomalyDetector", > Milliseconds(1000), > System.getenv("SPARK_HOME"), > List("target/scalaad-1.0-SNAPSHOT-jar-with-dependencies.jar")) > > //hdfs path to checkpoint old data > > ssc.checkpoint("hdfs://host-10-20-20-17.novalocal:9000/user/hduser/checkpointing/") > > val eegStreams = new > Array[org.apache.spark.streaming.dstream.DStream[String]](args.length - 2) > //array for multiple streams > > // Create the NetworkInputDStream > for (a <- 0 to (args.length - 3)) > { > eegStreams(a) = ssc.socketTextStream(args(1), args(a+2).toInt, > StorageLevel.MEMORY_AND_DISK_SER) //Multiple DStreams into Array > val sums = eegStreams(a).map(x => (math.pow(x.toDouble, 2), > x.toDouble, 1)).reduceByWindow((a, b) => (a._1 + b._1, a._2 + b._2, a._3 + > b._3),(a, b) => (a._1 - b._1, a._2 - b._2, a._3 - b._3), Seconds(4), > Seconds(4)) > val meanAndSD = sums.map(x => calculate(x._1,x._2,x._3)) > > meanAndSD.saveAsTextFiles("hdfs://host-10-20-20-17.novalocal:9000/user/hduser/output/" > + (a + 1) ) > } > > > ssc.start() > ssc.awaitTermination() > } > } > > > Regards, > Laeeq > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >