While running the program go to your clusters webUI (that runs on 8080,
prolly at hadoop.master:8080) and see how many cores are allocated to the
program, it should be >= 2 for the stream to get processed.


[image: Inline image 1]



Thanks
Best Regards

On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com <bit1...@163.com> wrote:

> Hi,
> I am trying the spark streaming log analysis reference application
> provided by Databricks at
> https://github.com/databricks/reference-apps/tree/master/logs_analyzer
> When I deploy the code to the standalone cluster, there is no output at
> will with the following shell script.Which means, the windowDStream has 0
> RDDs
> ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master
> spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3
> --class
> spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming
> LogApp.jar
>
> But, when I change --master to be --master local[3], the program starts to
> work fine. Can anyone have some advice? Thanks!
> ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master
> local[3] --executor-memory 512M --total-executor-cores 3 --class
> spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming 
> LogApp.jar
>
>
> object LogAnalyzerStreaming {
>
> val WINDOW_LENGTH = new Duration(12 * 1000)
> val SLIDE_INTERVAL = new Duration(6 * 1000)
>
> def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("Log Analyzer Streaming in
> Scala")
> val sc = new SparkContext(sparkConf)
> val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)
>
> val logLinesDStream = streamingContext.socketTextStream("localhost", 9999)
>
> val accessLogsDStream =
> logLinesDStream.map(ApacheAccessLog.parseLogLine).cache()
> val windowDStream = accessLogsDStream.window(WINDOW_LENGTH,
> SLIDE_INTERVAL)
>
> windowDStream.foreachRDD(accessLogs => {
> if (accessLogs.count() == 0) {
> println("No access com.databricks.app.logs received in this time
> interval")
> } else {
> // Calculate statistics based on the content size.
> val contentSizes = accessLogs.map(log => log.contentSize).cache()
> println("Content Size Avg: %s, Min: %s, Max: %s".format(
> contentSizes.reduce(_ + _) / contentSizes.count,
> contentSizes.min,
> contentSizes.max
> ))
>
> streamingContext.start()
> streamingContext.awaitTermination()
> }
> }
> ------------------------------
>
>

Reply via email to