Why are you using  rawSocketStream to read the data? I believe
rawSocketStream waits for a big chunk of data before it can start
processing it. I think what you are writing is a String and you should use
socketTextStream which reads the data on a per line basis.

On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa <saiph.ka...@gmail.com> wrote:

> Hi,
>
> I am trying to build a data generator that feeds a streaming application.
> This data generator just reads a file and send its lines through a socket.
> I get no errors on the logs, and the benchmark bellow always prints
> "Received 0 records". Am I doing something wrong?
>
>
> object MyDataGenerator {
>
>   def main(args: Array[String]) {
>     if (args.length != 3) {
>       System.err.println("Usage: RawTextSender <port> <file> <sleepMillis>")
>       System.exit(1)
>     }
>     // Parse the arguments using a pattern match
>     val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>
>     val serverSocket = new ServerSocket(port)
>     println("Listening on port " + port)
>
>
>     while (true) {
>       val socket = serverSocket.accept()
>       println("Got a new connection")
>
>
>       val out = new PrintWriter(socket.getOutputStream)
>       try {
>         var count = 0
>         var startTimestamp = -1
>         for (line <- Source.fromFile(file).getLines()) {
>           val ts = line.substring(2, line.indexOf(',',2)).toInt
>           if(startTimestamp < 0)
>             startTimestamp = ts
>
>           if(ts - startTimestamp <= 30) {
>             out.println(line)
>             count += 1
>           } else {
>             println(s"Emmited reports: $count")
>             count = 0
>             out.flush()
>             startTimestamp = ts
>             Thread.sleep(sleepMillis)
>           }
>         }
>       } catch {
>         case e: IOException =>
>           println("Client disconnected")
>           socket.close()
>       }
>     }
> }
> }
>
>
>
> object Benchmark {
>   def main(args: Array[String]) {
>     if (args.length != 4) {
>       System.err.println("Usage: RawNetworkGrep <numStreams> <host> <port> 
> <batchMillis>")
>       System.exit(1)
>     }
>
>     val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), 
> args(2).toInt, args(3).toInt)
>     val sparkConf = new SparkConf()
>     sparkConf.setAppName("BenchMark")
>     
> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
>     sparkConf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
>     sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops 
> -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 
> -XX:MaxInlineSize=300 ")
>     if (sparkConf.getOption("spark.master") == None) {
>       // Master not set, as this was not launched through Spark-submit. 
> Setting master as local."
>       sparkConf.setMaster("local[*]")
>     }
>
>     // Create the context
>     val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>
>     val rawStreams = (1 to numStreams).map(_ =>
>       ssc.rawSocketStream[String](host, port, 
> StorageLevel.MEMORY_ONLY_SER)).toArray
>     val union = ssc.union(rawStreams)
>     union.count().map(c => s"Received $c records").print()
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> Thanks.
>
>

Reply via email to