Thanks a lot. Now it's working fine. I wasn't aware of "socketTextStream", not sure if it was documented in the spark programming guide.
On Mon, Sep 21, 2015 at 12:46 PM, Hemant Bhanawat <hemant9...@gmail.com> wrote: > Why are you using rawSocketStream to read the data? I believe > rawSocketStream waits for a big chunk of data before it can start > processing it. I think what you are writing is a String and you should use > socketTextStream which reads the data on a per line basis. > > On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa <saiph.ka...@gmail.com> > wrote: > >> Hi, >> >> I am trying to build a data generator that feeds a streaming application. >> This data generator just reads a file and send its lines through a socket. >> I get no errors on the logs, and the benchmark bellow always prints >> "Received 0 records". Am I doing something wrong? >> >> >> object MyDataGenerator { >> >> def main(args: Array[String]) { >> if (args.length != 3) { >> System.err.println("Usage: RawTextSender <port> <file> <sleepMillis>") >> System.exit(1) >> } >> // Parse the arguments using a pattern match >> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt) >> >> val serverSocket = new ServerSocket(port) >> println("Listening on port " + port) >> >> >> while (true) { >> val socket = serverSocket.accept() >> println("Got a new connection") >> >> >> val out = new PrintWriter(socket.getOutputStream) >> try { >> var count = 0 >> var startTimestamp = -1 >> for (line <- Source.fromFile(file).getLines()) { >> val ts = line.substring(2, line.indexOf(',',2)).toInt >> if(startTimestamp < 0) >> startTimestamp = ts >> >> if(ts - startTimestamp <= 30) { >> out.println(line) >> count += 1 >> } else { >> println(s"Emmited reports: $count") >> count = 0 >> out.flush() >> startTimestamp = ts >> Thread.sleep(sleepMillis) >> } >> } >> } catch { >> case e: IOException => >> println("Client disconnected") >> socket.close() >> } >> } >> } >> } >> >> >> >> object Benchmark { >> def main(args: Array[String]) { >> if (args.length != 4) { >> System.err.println("Usage: RawNetworkGrep <numStreams> <host> <port> >> <batchMillis>") >> System.exit(1) >> } >> >> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), >> args(2).toInt, args(3).toInt) >> val sparkConf = new SparkConf() >> sparkConf.setAppName("BenchMark") >> >> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar")) >> sparkConf.set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") >> sparkConf.set("spark.executor.extraJavaOptions", " >> -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts >> -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ") >> if (sparkConf.getOption("spark.master") == None) { >> // Master not set, as this was not launched through Spark-submit. >> Setting master as local." >> sparkConf.setMaster("local[*]") >> } >> >> // Create the context >> val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) >> >> val rawStreams = (1 to numStreams).map(_ => >> ssc.rawSocketStream[String](host, port, >> StorageLevel.MEMORY_ONLY_SER)).toArray >> val union = ssc.union(rawStreams) >> union.count().map(c => s"Received $c records").print() >> ssc.start() >> ssc.awaitTermination() >> } >> } >> >> Thanks. >> >> >