Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-31 Thread Brandon White
Tathagata,

Could the bottleneck possibility be the number of executor nodes in our
cluster? Since we are creating 500 Dstreams based off 500 textfile
directories, do we need at least 500 executors / nodes to be receivers for
each one of the streams?

On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das t...@databricks.com wrote:

 @Ashwin: You could append the topic in the data.

 val kafkaStreams = topics.map { topic =
 KafkaUtils.createDirectStream(topic...).map { x = (x, topic) }
 }
 val unionedStream = context.union(kafkaStreams)


 @Brandon:
 I dont recommend it, but you could do something crazy like use the
 foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
 waits for all the jobs to complete.

 manyDStreams.foreach { dstream =
dstream1.foreachRDD { rdd =
 // Add runnable that runs the job on RDD to threadpool
 // This does not wait for the job to finish
   }
 }

 anyOfTheManyDStreams.foreachRDD { _ =
 // wait for all the current batch's jobs in the threadpool to
 complete.

 }


 This would run all the Spark jobs in the batch in parallel in thread pool,
 but it would also make sure all the jobs finish before the batch is marked
 as completed.

 On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Thank you Tathagata. My main use case for the 500 streams is to append
 new elements into their corresponding Spark SQL tables. Every stream is
 mapped to a table so I'd like to use the streams to appended the new rdds
 to the table. If I union all the streams, appending new elements becomes a
 nightmare. So there is no other way to parallelize something like the
 following? Will this still run sequence or timeout?

 //500 streams
 streams.foreach { stream =
   stream.foreachRDD { rdd =
 val df = sqlContext.jsonRDD(rdd)
 df.saveAsTable(streamTuple._1, SaveMode.Append)

   }
 }

 On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com
 wrote:

 I dont think any one has really run 500 text streams.
 And parSequences do nothing out there, you are only parallelizing the
 setup code which does not really compute anything. Also it setsup 500
 foreachRDD operations that will get executed in each batch sequentially, so
 does not make sense. The write way to parallelize this is union all the
 streams.

 val streams = streamPaths.map { path =
   ssc.textFileStream(path)
 }
 val unionedStream = streamingContext.union(streams)
 unionedStream.foreachRDD { rdd =
   // do something
 }

 Then there is only one foreachRDD executed in every batch that will
 process in parallel all the new files in each batch interval.
 TD


 On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?







Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-31 Thread Ashwin Giridharan
Thanks a lot @Das @Cody. I moved from receiver based to direct stream and I
can get the topics from the offset!!

On Fri, Jul 31, 2015 at 4:41 PM, Brandon White bwwintheho...@gmail.com
wrote:

 Tathagata,

 Could the bottleneck possibility be the number of executor nodes in our
 cluster? Since we are creating 500 Dstreams based off 500 textfile
 directories, do we need at least 500 executors / nodes to be receivers for
 each one of the streams?

 On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das t...@databricks.com
 wrote:

 @Ashwin: You could append the topic in the data.

 val kafkaStreams = topics.map { topic =
 KafkaUtils.createDirectStream(topic...).map { x = (x, topic) }
 }
 val unionedStream = context.union(kafkaStreams)


 @Brandon:
 I dont recommend it, but you could do something crazy like use the
 foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
 waits for all the jobs to complete.

 manyDStreams.foreach { dstream =
dstream1.foreachRDD { rdd =
 // Add runnable that runs the job on RDD to threadpool
 // This does not wait for the job to finish
   }
 }

 anyOfTheManyDStreams.foreachRDD { _ =
 // wait for all the current batch's jobs in the threadpool to
 complete.

 }


 This would run all the Spark jobs in the batch in parallel in thread
 pool, but it would also make sure all the jobs finish before the batch is
 marked as completed.

 On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Thank you Tathagata. My main use case for the 500 streams is to append
 new elements into their corresponding Spark SQL tables. Every stream is
 mapped to a table so I'd like to use the streams to appended the new rdds
 to the table. If I union all the streams, appending new elements becomes a
 nightmare. So there is no other way to parallelize something like the
 following? Will this still run sequence or timeout?

 //500 streams
 streams.foreach { stream =
   stream.foreachRDD { rdd =
 val df = sqlContext.jsonRDD(rdd)
 df.saveAsTable(streamTuple._1, SaveMode.Append)

   }
 }

 On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com
 wrote:

 I dont think any one has really run 500 text streams.
 And parSequences do nothing out there, you are only parallelizing the
 setup code which does not really compute anything. Also it setsup 500
 foreachRDD operations that will get executed in each batch sequentially, so
 does not make sense. The write way to parallelize this is union all the
 streams.

 val streams = streamPaths.map { path =
   ssc.textFileStream(path)
 }
 val unionedStream = streamingContext.union(streams)
 unionedStream.foreachRDD { rdd =
   // do something
 }

 Then there is only one foreachRDD executed in every batch that will
 process in parallel all the new files in each batch interval.
 TD


 On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com
  wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?








-- 
Thanks  Regards,
Ashwin Giridharan


Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-31 Thread Tathagata Das
@Brandon, the file streams do not use receivers, so the bottleneck is not
about executors per se. But there could be couple of bottlenecks
1. Every batch interval, the 500 dstreams are going to get directory
listing from 500 directories, SEQUENTIALLY. So preparing the batch's RDDs
and jobs can take a time. So your batch interval cant be small, may have to
be 10s of seconds. Which is probably fine for your application, otherwise
you would not be using files in the first place.
2. Processing new files from 500 directories may take significant
computation power. Just make sure you get large enough cluster.

On Fri, Jul 31, 2015 at 2:40 PM, Ashwin Giridharan ashwin.fo...@gmail.com
wrote:

 Thanks a lot @Das @Cody. I moved from receiver based to direct stream and
 I can get the topics from the offset!!

 On Fri, Jul 31, 2015 at 4:41 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Tathagata,

 Could the bottleneck possibility be the number of executor nodes in our
 cluster? Since we are creating 500 Dstreams based off 500 textfile
 directories, do we need at least 500 executors / nodes to be receivers for
 each one of the streams?

 On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das t...@databricks.com
 wrote:

 @Ashwin: You could append the topic in the data.

 val kafkaStreams = topics.map { topic =
 KafkaUtils.createDirectStream(topic...).map { x = (x, topic) }
 }
 val unionedStream = context.union(kafkaStreams)


 @Brandon:
 I dont recommend it, but you could do something crazy like use the
 foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
 waits for all the jobs to complete.

 manyDStreams.foreach { dstream =
dstream1.foreachRDD { rdd =
 // Add runnable that runs the job on RDD to threadpool
 // This does not wait for the job to finish
   }
 }

 anyOfTheManyDStreams.foreachRDD { _ =
 // wait for all the current batch's jobs in the threadpool to
 complete.

 }


 This would run all the Spark jobs in the batch in parallel in thread
 pool, but it would also make sure all the jobs finish before the batch is
 marked as completed.

 On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Thank you Tathagata. My main use case for the 500 streams is to append
 new elements into their corresponding Spark SQL tables. Every stream is
 mapped to a table so I'd like to use the streams to appended the new rdds
 to the table. If I union all the streams, appending new elements becomes a
 nightmare. So there is no other way to parallelize something like the
 following? Will this still run sequence or timeout?

 //500 streams
 streams.foreach { stream =
   stream.foreachRDD { rdd =
 val df = sqlContext.jsonRDD(rdd)
 df.saveAsTable(streamTuple._1, SaveMode.Append)

   }
 }

 On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com
 wrote:

 I dont think any one has really run 500 text streams.
 And parSequences do nothing out there, you are only parallelizing the
 setup code which does not really compute anything. Also it setsup 500
 foreachRDD operations that will get executed in each batch sequentially, 
 so
 does not make sense. The write way to parallelize this is union all the
 streams.

 val streams = streamPaths.map { path =
   ssc.textFileStream(path)
 }
 val unionedStream = streamingContext.union(streams)
 unionedStream.foreachRDD { rdd =
   // do something
 }

 Then there is only one foreachRDD executed in every batch that will
 process in parallel all the new files in each batch interval.
 TD


 On Tue, Jul 28, 2015 at 3:06 PM, Brandon White 
 bwwintheho...@gmail.com wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?








 --
 Thanks  Regards,
 Ashwin Giridharan



Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-29 Thread Cody Koeninger
@Ashwin you don't need to append the topic to your data if you're using the
direct stream.  You can get the topic from the offset range, see
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
 (search for offsetRange)

If you're using the receiver based stream, you'll need to follow TD's
suggestion

On Tue, Jul 28, 2015 at 8:09 PM, Tathagata Das t...@databricks.com wrote:

 @Ashwin: You could append the topic in the data.

 val kafkaStreams = topics.map { topic =
 KafkaUtils.createDirectStream(topic...).map { x = (x, topic) }
 }
 val unionedStream = context.union(kafkaStreams)


 @Brandon:
 I dont recommend it, but you could do something crazy like use the
 foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
 waits for all the jobs to complete.

 manyDStreams.foreach { dstream =
dstream1.foreachRDD { rdd =
 // Add runnable that runs the job on RDD to threadpool
 // This does not wait for the job to finish
   }
 }

 anyOfTheManyDStreams.foreachRDD { _ =
 // wait for all the current batch's jobs in the threadpool to
 complete.

 }


 This would run all the Spark jobs in the batch in parallel in thread pool,
 but it would also make sure all the jobs finish before the batch is marked
 as completed.

 On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Thank you Tathagata. My main use case for the 500 streams is to append
 new elements into their corresponding Spark SQL tables. Every stream is
 mapped to a table so I'd like to use the streams to appended the new rdds
 to the table. If I union all the streams, appending new elements becomes a
 nightmare. So there is no other way to parallelize something like the
 following? Will this still run sequence or timeout?

 //500 streams
 streams.foreach { stream =
   stream.foreachRDD { rdd =
 val df = sqlContext.jsonRDD(rdd)
 df.saveAsTable(streamTuple._1, SaveMode.Append)

   }
 }

 On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com
 wrote:

 I dont think any one has really run 500 text streams.
 And parSequences do nothing out there, you are only parallelizing the
 setup code which does not really compute anything. Also it setsup 500
 foreachRDD operations that will get executed in each batch sequentially, so
 does not make sense. The write way to parallelize this is union all the
 streams.

 val streams = streamPaths.map { path =
   ssc.textFileStream(path)
 }
 val unionedStream = streamingContext.union(streams)
 unionedStream.foreachRDD { rdd =
   // do something
 }

 Then there is only one foreachRDD executed in every batch that will
 process in parallel all the new files in each batch interval.
 TD


 On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?







Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-28 Thread Ashwin Giridharan
@Das, Is there anyway to identify a kafka topic when we have unified
stream? As of now, for each topic I create dedicated DStream and use
foreachRDD on each of these Streams. If I have say 100 kafka topics, then
how can I use unified stream and still take topic specific actions inside
foreachRDD ?

On Tue, Jul 28, 2015 at 6:42 PM, Tathagata Das t...@databricks.com wrote:

 I dont think any one has really run 500 text streams.
 And parSequences do nothing out there, you are only parallelizing the
 setup code which does not really compute anything. Also it setsup 500
 foreachRDD operations that will get executed in each batch sequentially, so
 does not make sense. The write way to parallelize this is union all the
 streams.

 val streams = streamPaths.map { path =
   ssc.textFileStream(path)
 }
 val unionedStream = streamingContext.union(streams)
 unionedStream.foreachRDD { rdd =
   // do something
 }

 Then there is only one foreachRDD executed in every batch that will
 process in parallel all the new files in each batch interval.
 TD


 On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?





-- 
Thanks  Regards,
Ashwin Giridharan


Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-28 Thread Brandon White
Thank you Tathagata. My main use case for the 500 streams is to append new
elements into their corresponding Spark SQL tables. Every stream is mapped
to a table so I'd like to use the streams to appended the new rdds to the
table. If I union all the streams, appending new elements becomes a
nightmare. So there is no other way to parallelize something like the
following? Will this still run sequence or timeout?

//500 streams
streams.foreach { stream =
  stream.foreachRDD { rdd =
val df = sqlContext.jsonRDD(rdd)
df.saveAsTable(streamTuple._1, SaveMode.Append)

  }
}

On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com wrote:

 I dont think any one has really run 500 text streams.
 And parSequences do nothing out there, you are only parallelizing the
 setup code which does not really compute anything. Also it setsup 500
 foreachRDD operations that will get executed in each batch sequentially, so
 does not make sense. The write way to parallelize this is union all the
 streams.

 val streams = streamPaths.map { path =
   ssc.textFileStream(path)
 }
 val unionedStream = streamingContext.union(streams)
 unionedStream.foreachRDD { rdd =
   // do something
 }

 Then there is only one foreachRDD executed in every batch that will
 process in parallel all the new files in each batch interval.
 TD


 On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?





Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-28 Thread Tathagata Das
@Ashwin: You could append the topic in the data.

val kafkaStreams = topics.map { topic =
KafkaUtils.createDirectStream(topic...).map { x = (x, topic) }
}
val unionedStream = context.union(kafkaStreams)


@Brandon:
I dont recommend it, but you could do something crazy like use the
foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
waits for all the jobs to complete.

manyDStreams.foreach { dstream =
   dstream1.foreachRDD { rdd =
// Add runnable that runs the job on RDD to threadpool
// This does not wait for the job to finish
  }
}

anyOfTheManyDStreams.foreachRDD { _ =
// wait for all the current batch's jobs in the threadpool to complete.

}


This would run all the Spark jobs in the batch in parallel in thread pool,
but it would also make sure all the jobs finish before the batch is marked
as completed.

On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com
wrote:

 Thank you Tathagata. My main use case for the 500 streams is to append new
 elements into their corresponding Spark SQL tables. Every stream is mapped
 to a table so I'd like to use the streams to appended the new rdds to the
 table. If I union all the streams, appending new elements becomes a
 nightmare. So there is no other way to parallelize something like the
 following? Will this still run sequence or timeout?

 //500 streams
 streams.foreach { stream =
   stream.foreachRDD { rdd =
 val df = sqlContext.jsonRDD(rdd)
 df.saveAsTable(streamTuple._1, SaveMode.Append)

   }
 }

 On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com
 wrote:

 I dont think any one has really run 500 text streams.
 And parSequences do nothing out there, you are only parallelizing the
 setup code which does not really compute anything. Also it setsup 500
 foreachRDD operations that will get executed in each batch sequentially, so
 does not make sense. The write way to parallelize this is union all the
 streams.

 val streams = streamPaths.map { path =
   ssc.textFileStream(path)
 }
 val unionedStream = streamingContext.union(streams)
 unionedStream.foreachRDD { rdd =
   // do something
 }

 Then there is only one foreachRDD executed in every batch that will
 process in parallel all the new files in each batch interval.
 TD


 On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?






Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-28 Thread Tathagata Das
I dont think any one has really run 500 text streams.
And parSequences do nothing out there, you are only parallelizing the setup
code which does not really compute anything. Also it setsup 500 foreachRDD
operations that will get executed in each batch sequentially, so does not
make sense. The write way to parallelize this is union all the streams.

val streams = streamPaths.map { path =
  ssc.textFileStream(path)
}
val unionedStream = streamingContext.union(streams)
unionedStream.foreachRDD { rdd =
  // do something
}

Then there is only one foreachRDD executed in every batch that will process
in parallel all the new files in each batch interval.
TD


On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com
wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?