Why are you using rawSocketStream to read the data? I believe rawSocketStream waits for a big chunk of data before it can start processing it. I think what you are writing is a String and you should use socketTextStream which reads the data on a per line basis.
On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa <saiph.ka...@gmail.com> wrote: > Hi, > > I am trying to build a data generator that feeds a streaming application. > This data generator just reads a file and send its lines through a socket. > I get no errors on the logs, and the benchmark bellow always prints > "Received 0 records". Am I doing something wrong? > > > object MyDataGenerator { > > def main(args: Array[String]) { > if (args.length != 3) { > System.err.println("Usage: RawTextSender <port> <file> <sleepMillis>") > System.exit(1) > } > // Parse the arguments using a pattern match > val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt) > > val serverSocket = new ServerSocket(port) > println("Listening on port " + port) > > > while (true) { > val socket = serverSocket.accept() > println("Got a new connection") > > > val out = new PrintWriter(socket.getOutputStream) > try { > var count = 0 > var startTimestamp = -1 > for (line <- Source.fromFile(file).getLines()) { > val ts = line.substring(2, line.indexOf(',',2)).toInt > if(startTimestamp < 0) > startTimestamp = ts > > if(ts - startTimestamp <= 30) { > out.println(line) > count += 1 > } else { > println(s"Emmited reports: $count") > count = 0 > out.flush() > startTimestamp = ts > Thread.sleep(sleepMillis) > } > } > } catch { > case e: IOException => > println("Client disconnected") > socket.close() > } > } > } > } > > > > object Benchmark { > def main(args: Array[String]) { > if (args.length != 4) { > System.err.println("Usage: RawNetworkGrep <numStreams> <host> <port> > <batchMillis>") > System.exit(1) > } > > val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), > args(2).toInt, args(3).toInt) > val sparkConf = new SparkConf() > sparkConf.setAppName("BenchMark") > > sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar")) > sparkConf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops > -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 > -XX:MaxInlineSize=300 ") > if (sparkConf.getOption("spark.master") == None) { > // Master not set, as this was not launched through Spark-submit. > Setting master as local." > sparkConf.setMaster("local[*]") > } > > // Create the context > val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) > > val rawStreams = (1 to numStreams).map(_ => > ssc.rawSocketStream[String](host, port, > StorageLevel.MEMORY_ONLY_SER)).toArray > val union = ssc.union(rawStreams) > union.count().map(c => s"Received $c records").print() > ssc.start() > ssc.awaitTermination() > } > } > > Thanks. > >