While running the program go to your clusters webUI (that runs on 8080, prolly at hadoop.master:8080) and see how many cores are allocated to the program, it should be >= 2 for the stream to get processed.
[image: Inline image 1] Thanks Best Regards On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com <bit1...@163.com> wrote: > Hi, > I am trying the spark streaming log analysis reference application > provided by Databricks at > https://github.com/databricks/reference-apps/tree/master/logs_analyzer > When I deploy the code to the standalone cluster, there is no output at > will with the following shell script.Which means, the windowDStream has 0 > RDDs > ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master > spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 > --class > spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming > LogApp.jar > > But, when I change --master to be --master local[3], the program starts to > work fine. Can anyone have some advice? Thanks! > ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master > local[3] --executor-memory 512M --total-executor-cores 3 --class > spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming > LogApp.jar > > > object LogAnalyzerStreaming { > > val WINDOW_LENGTH = new Duration(12 * 1000) > val SLIDE_INTERVAL = new Duration(6 * 1000) > > def main(args: Array[String]) { > val sparkConf = new SparkConf().setAppName("Log Analyzer Streaming in > Scala") > val sc = new SparkContext(sparkConf) > val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) > > val logLinesDStream = streamingContext.socketTextStream("localhost", 9999) > > val accessLogsDStream = > logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() > val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, > SLIDE_INTERVAL) > > windowDStream.foreachRDD(accessLogs => { > if (accessLogs.count() == 0) { > println("No access com.databricks.app.logs received in this time > interval") > } else { > // Calculate statistics based on the content size. > val contentSizes = accessLogs.map(log => log.contentSize).cache() > println("Content Size Avg: %s, Min: %s, Max: %s".format( > contentSizes.reduce(_ + _) / contentSizes.count, > contentSizes.min, > contentSizes.max > )) > > streamingContext.start() > streamingContext.awaitTermination() > } > } > ------------------------------ > >