Thanks a lot. Now it's working fine. I wasn't aware of "socketTextStream",
not sure if it was documented in the spark programming guide.

On Mon, Sep 21, 2015 at 12:46 PM, Hemant Bhanawat <hemant9...@gmail.com>
wrote:

> Why are you using  rawSocketStream to read the data? I believe
> rawSocketStream waits for a big chunk of data before it can start
> processing it. I think what you are writing is a String and you should use
> socketTextStream which reads the data on a per line basis.
>
> On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa <saiph.ka...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am trying to build a data generator that feeds a streaming application.
>> This data generator just reads a file and send its lines through a socket.
>> I get no errors on the logs, and the benchmark bellow always prints
>> "Received 0 records". Am I doing something wrong?
>>
>>
>> object MyDataGenerator {
>>
>>   def main(args: Array[String]) {
>>     if (args.length != 3) {
>>       System.err.println("Usage: RawTextSender <port> <file> <sleepMillis>")
>>       System.exit(1)
>>     }
>>     // Parse the arguments using a pattern match
>>     val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>>
>>     val serverSocket = new ServerSocket(port)
>>     println("Listening on port " + port)
>>
>>
>>     while (true) {
>>       val socket = serverSocket.accept()
>>       println("Got a new connection")
>>
>>
>>       val out = new PrintWriter(socket.getOutputStream)
>>       try {
>>         var count = 0
>>         var startTimestamp = -1
>>         for (line <- Source.fromFile(file).getLines()) {
>>           val ts = line.substring(2, line.indexOf(',',2)).toInt
>>           if(startTimestamp < 0)
>>             startTimestamp = ts
>>
>>           if(ts - startTimestamp <= 30) {
>>             out.println(line)
>>             count += 1
>>           } else {
>>             println(s"Emmited reports: $count")
>>             count = 0
>>             out.flush()
>>             startTimestamp = ts
>>             Thread.sleep(sleepMillis)
>>           }
>>         }
>>       } catch {
>>         case e: IOException =>
>>           println("Client disconnected")
>>           socket.close()
>>       }
>>     }
>> }
>> }
>>
>>
>>
>> object Benchmark {
>>   def main(args: Array[String]) {
>>     if (args.length != 4) {
>>       System.err.println("Usage: RawNetworkGrep <numStreams> <host> <port> 
>> <batchMillis>")
>>       System.exit(1)
>>     }
>>
>>     val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), 
>> args(2).toInt, args(3).toInt)
>>     val sparkConf = new SparkConf()
>>     sparkConf.setAppName("BenchMark")
>>     
>> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
>>     sparkConf.set("spark.serializer", 
>> "org.apache.spark.serializer.KryoSerializer")
>>     sparkConf.set("spark.executor.extraJavaOptions", " 
>> -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts 
>> -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ")
>>     if (sparkConf.getOption("spark.master") == None) {
>>       // Master not set, as this was not launched through Spark-submit. 
>> Setting master as local."
>>       sparkConf.setMaster("local[*]")
>>     }
>>
>>     // Create the context
>>     val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>>
>>     val rawStreams = (1 to numStreams).map(_ =>
>>       ssc.rawSocketStream[String](host, port, 
>> StorageLevel.MEMORY_ONLY_SER)).toArray
>>     val union = ssc.union(rawStreams)
>>     union.count().map(c => s"Received $c records").print()
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>> }
>>
>> Thanks.
>>
>>
>

Reply via email to