Re: DataGenerator for streaming application

2015-09-21 Thread Hemant Bhanawat
Why are you using  rawSocketStream to read the data? I believe
rawSocketStream waits for a big chunk of data before it can start
processing it. I think what you are writing is a String and you should use
socketTextStream which reads the data on a per line basis.

On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa  wrote:

> Hi,
>
> I am trying to build a data generator that feeds a streaming application.
> This data generator just reads a file and send its lines through a socket.
> I get no errors on the logs, and the benchmark bellow always prints
> "Received 0 records". Am I doing something wrong?
>
>
> object MyDataGenerator {
>
>   def main(args: Array[String]) {
> if (args.length != 3) {
>   System.err.println("Usage: RawTextSender   ")
>   System.exit(1)
> }
> // Parse the arguments using a pattern match
> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>
> val serverSocket = new ServerSocket(port)
> println("Listening on port " + port)
>
>
> while (true) {
>   val socket = serverSocket.accept()
>   println("Got a new connection")
>
>
>   val out = new PrintWriter(socket.getOutputStream)
>   try {
> var count = 0
> var startTimestamp = -1
> for (line <- Source.fromFile(file).getLines()) {
>   val ts = line.substring(2, line.indexOf(',',2)).toInt
>   if(startTimestamp < 0)
> startTimestamp = ts
>
>   if(ts - startTimestamp <= 30) {
> out.println(line)
> count += 1
>   } else {
> println(s"Emmited reports: $count")
> count = 0
> out.flush()
> startTimestamp = ts
> Thread.sleep(sleepMillis)
>   }
> }
>   } catch {
> case e: IOException =>
>   println("Client disconnected")
>   socket.close()
>   }
> }
> }
> }
>
>
>
> object Benchmark {
>   def main(args: Array[String]) {
> if (args.length != 4) {
>   System.err.println("Usage: RawNetworkGrep
> ")
>   System.exit(1)
> }
>
> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), 
> args(2).toInt, args(3).toInt)
> val sparkConf = new SparkConf()
> sparkConf.setAppName("BenchMark")
> 
> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
> sparkConf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops 
> -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 
> -XX:MaxInlineSize=300 ")
> if (sparkConf.getOption("spark.master") == None) {
>   // Master not set, as this was not launched through Spark-submit. 
> Setting master as local."
>   sparkConf.setMaster("local[*]")
> }
>
> // Create the context
> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>
> val rawStreams = (1 to numStreams).map(_ =>
>   ssc.rawSocketStream[String](host, port, 
> StorageLevel.MEMORY_ONLY_SER)).toArray
> val union = ssc.union(rawStreams)
> union.count().map(c => s"Received $c records").print()
> ssc.start()
> ssc.awaitTermination()
>   }
> }
>
> Thanks.
>
>


Re: DataGenerator for streaming application

2015-09-21 Thread Saiph Kappa
Thanks a lot. Now it's working fine. I wasn't aware of "socketTextStream",
not sure if it was documented in the spark programming guide.

On Mon, Sep 21, 2015 at 12:46 PM, Hemant Bhanawat 
wrote:

> Why are you using  rawSocketStream to read the data? I believe
> rawSocketStream waits for a big chunk of data before it can start
> processing it. I think what you are writing is a String and you should use
> socketTextStream which reads the data on a per line basis.
>
> On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa 
> wrote:
>
>> Hi,
>>
>> I am trying to build a data generator that feeds a streaming application.
>> This data generator just reads a file and send its lines through a socket.
>> I get no errors on the logs, and the benchmark bellow always prints
>> "Received 0 records". Am I doing something wrong?
>>
>>
>> object MyDataGenerator {
>>
>>   def main(args: Array[String]) {
>> if (args.length != 3) {
>>   System.err.println("Usage: RawTextSender   ")
>>   System.exit(1)
>> }
>> // Parse the arguments using a pattern match
>> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>>
>> val serverSocket = new ServerSocket(port)
>> println("Listening on port " + port)
>>
>>
>> while (true) {
>>   val socket = serverSocket.accept()
>>   println("Got a new connection")
>>
>>
>>   val out = new PrintWriter(socket.getOutputStream)
>>   try {
>> var count = 0
>> var startTimestamp = -1
>> for (line <- Source.fromFile(file).getLines()) {
>>   val ts = line.substring(2, line.indexOf(',',2)).toInt
>>   if(startTimestamp < 0)
>> startTimestamp = ts
>>
>>   if(ts - startTimestamp <= 30) {
>> out.println(line)
>> count += 1
>>   } else {
>> println(s"Emmited reports: $count")
>> count = 0
>> out.flush()
>> startTimestamp = ts
>> Thread.sleep(sleepMillis)
>>   }
>> }
>>   } catch {
>> case e: IOException =>
>>   println("Client disconnected")
>>   socket.close()
>>   }
>> }
>> }
>> }
>>
>>
>>
>> object Benchmark {
>>   def main(args: Array[String]) {
>> if (args.length != 4) {
>>   System.err.println("Usage: RawNetworkGrep
>> ")
>>   System.exit(1)
>> }
>>
>> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), 
>> args(2).toInt, args(3).toInt)
>> val sparkConf = new SparkConf()
>> sparkConf.setAppName("BenchMark")
>> 
>> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
>> sparkConf.set("spark.serializer", 
>> "org.apache.spark.serializer.KryoSerializer")
>> sparkConf.set("spark.executor.extraJavaOptions", " 
>> -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts 
>> -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ")
>> if (sparkConf.getOption("spark.master") == None) {
>>   // Master not set, as this was not launched through Spark-submit. 
>> Setting master as local."
>>   sparkConf.setMaster("local[*]")
>> }
>>
>> // Create the context
>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>>
>> val rawStreams = (1 to numStreams).map(_ =>
>>   ssc.rawSocketStream[String](host, port, 
>> StorageLevel.MEMORY_ONLY_SER)).toArray
>> val union = ssc.union(rawStreams)
>> union.count().map(c => s"Received $c records").print()
>> ssc.start()
>> ssc.awaitTermination()
>>   }
>> }
>>
>> Thanks.
>>
>>
>


DataGenerator for streaming application

2015-09-19 Thread Saiph Kappa
Hi,

I am trying to build a data generator that feeds a streaming application.
This data generator just reads a file and send its lines through a socket.
I get no errors on the logs, and the benchmark bellow always prints
"Received 0 records". Am I doing something wrong?


object MyDataGenerator {

  def main(args: Array[String]) {
if (args.length != 3) {
  System.err.println("Usage: RawTextSender   ")
  System.exit(1)
}
// Parse the arguments using a pattern match
val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)

val serverSocket = new ServerSocket(port)
println("Listening on port " + port)


while (true) {
  val socket = serverSocket.accept()
  println("Got a new connection")


  val out = new PrintWriter(socket.getOutputStream)
  try {
var count = 0
var startTimestamp = -1
for (line <- Source.fromFile(file).getLines()) {
  val ts = line.substring(2, line.indexOf(',',2)).toInt
  if(startTimestamp < 0)
startTimestamp = ts

  if(ts - startTimestamp <= 30) {
out.println(line)
count += 1
  } else {
println(s"Emmited reports: $count")
count = 0
out.flush()
startTimestamp = ts
Thread.sleep(sleepMillis)
  }
}
  } catch {
case e: IOException =>
  println("Client disconnected")
  socket.close()
  }
}
}
}



object Benchmark {
  def main(args: Array[String]) {
if (args.length != 4) {
  System.err.println("Usage: RawNetworkGrep  
 ")
  System.exit(1)
}

val (numStreams, host, port, batchMillis) = (args(0).toInt,
args(1), args(2).toInt, args(3).toInt)
val sparkConf = new SparkConf()
sparkConf.setAppName("BenchMark")

sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.executor.extraJavaOptions", "
-XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts
-XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ")
if (sparkConf.getOption("spark.master") == None) {
  // Master not set, as this was not launched through
Spark-submit. Setting master as local."
  sparkConf.setMaster("local[*]")
}

// Create the context
val ssc = new StreamingContext(sparkConf, Duration(batchMillis))

val rawStreams = (1 to numStreams).map(_ =>
  ssc.rawSocketStream[String](host, port,
StorageLevel.MEMORY_ONLY_SER)).toArray
val union = ssc.union(rawStreams)
union.count().map(c => s"Received $c records").print()
ssc.start()
ssc.awaitTermination()
  }
}

Thanks.