Question about Spark Streaming checkpoint interval
Need some clarification about the documentation. According to Spark doc "the default interval is a multiple of the batch interval that is at least 10 seconds. It can be set by using dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.” My question is that does the checkpointinterval apply only for data checkpointing or it applies to metadata checkpointing? The API says dstream.checkpoint() is for "Enable periodic checkpointing of RDDs of this DStream”, implying it is only for data checkpointing. My understanding is that metadata checkpointing is for driver failure. For example, in Kafka direct API, driver keeps track of the offset range of each partition. So if metadata checkpoint is NOT done for each batch, in driver failure, some messages in Kafka is going to be replayed. I do not find the answer in the document saying whether metadata checkpointing is done for each batch and whether checkpointinterval setting applies to both types of checkpointing. Maybe I miss it. If anyone can point me to the right documentation, I would highly appreciate it. Best Regards, Lan
Re: Question about Spark Streaming checkpoint interval
You are right. "checkpointInterval" is only for data checkpointing. "metadata checkpoint" is done for each batch. Feel free to send a PR to add the missing doc. Best Regards, Shixiong Zhu 2015-12-18 8:26 GMT-08:00 Lan Jiang: > Need some clarification about the documentation. According to Spark doc > > *"the default interval is a multiple of the batch interval that is at > least 10 seconds. It can be set by > using dstream.checkpoint(checkpointInterval). Typically, a checkpoint > interval of 5 - 10 sliding intervals of a DStream is a good setting to > try.”* > > My question is that does the *checkpointinterval* apply only for *data > checkpointing* or it applies to *metadata checkpointing*? The API says > dstream.checkpoint() is for "Enable periodic checkpointing of RDDs of this > DStream”, implying it is only for data checkpointing. My understanding is > that metadata checkpointing is for driver failure. For example, in Kafka > direct API, driver keeps track of the offset range of each partition. So if > metadata checkpoint is NOT done for each batch, in driver failure, some > messages in Kafka is going to be replayed. > > I do not find the answer in the document saying *whether metadata > checkpointing is done for each batch* and whether checkpointinterval > setting applies to both types of checkpointing. Maybe I miss it. If anyone > can point me to the right documentation, I would highly appreciate it. > > Best Regards, > > Lan >
Re: question about spark streaming
Have a look at the various versions of PairDStreamFunctions.updateStateByWindow ( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions). It supports updating running state in memory. (You can persist the state to a database/files periodically if you want). Use an in-memory data structure like a hash map with SKU-price key-values. Update the map as needed on each iteration. One of the versions of this function lets you specify a partitioner if you still need to shard keys. Also, I would be flexible about the 1 second batch interval. Is that really a mandatory requirement for this problem? HTH, dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 5:24 AM, sequoiadb mailing-list-r...@sequoiadb.com wrote: hi guys, i have a question about spark streaming. There’s an application keep sending transaction records into spark stream with about 50k tps The record represents a sales information including customer id / product id / time / price columns The application is required to monitor the change of price for each product. For example, if the price of a product increases 10% within 3 minutes, it will send an alert to end user. The interval is required to be set every 1 second, window is somewhere between 180 to 300 seconds. The issue is that I have to compare the price of each transaction ( totally about 10k different products ) against the lowest/highest price for the same product in the all past 180 seconds. That means, in every single second, I have to loop through 50k transactions and compare the price of the same product in all 180 seconds. So it seems I have to separate the calculation based on product id, so that each worker only processes a certain list of products. For example, if I can make sure the same product id always go to the same worker agent, it doesn’t need to shuffle data between worker agent for each comparison. Otherwise if it required to compare each transaction with all other RDDs that cross multiple worker agent, I guess it may not be fast enough for the requirement. Is there anyone knows how to specify the worker node for each transaction record based on its product id, in order to avoid massive shuffle operation? If simply making the product id as the key and price as the value, reduceByKeyAndWindow may cause massive shuffle and slow down the whole throughput. Am I correct? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
question about spark streaming
hi guys, i have a question about spark streaming. There’s an application keep sending transaction records into spark stream with about 50k tps The record represents a sales information including customer id / product id / time / price columns The application is required to monitor the change of price for each product. For example, if the price of a product increases 10% within 3 minutes, it will send an alert to end user. The interval is required to be set every 1 second, window is somewhere between 180 to 300 seconds. The issue is that I have to compare the price of each transaction ( totally about 10k different products ) against the lowest/highest price for the same product in the all past 180 seconds. That means, in every single second, I have to loop through 50k transactions and compare the price of the same product in all 180 seconds. So it seems I have to separate the calculation based on product id, so that each worker only processes a certain list of products. For example, if I can make sure the same product id always go to the same worker agent, it doesn’t need to shuffle data between worker agent for each comparison. Otherwise if it required to compare each transaction with all other RDDs that cross multiple worker agent, I guess it may not be fast enough for the requirement. Is there anyone knows how to specify the worker node for each transaction record based on its product id, in order to avoid massive shuffle operation? If simply making the product id as the key and price as the value, reduceByKeyAndWindow may cause massive shuffle and slow down the whole throughput. Am I correct? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question about Spark Streaming Receiver Failure
Dibyendu, Thanks for the reply. I am reading your project homepage now. One quick question I care about is: If the receivers failed for some reasons(for example, killed brutally by someone else), is there any mechanism for the receiver to fail over automatically? On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Which version of Spark you are running ? You can try this Low Level Consumer : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer This is designed to recover from various failures and have very good fault recovery mechanism built in. This is being used by many users and at present we at Pearson running this Receiver in Production for almost 3 months without any issue. You can give this a try. Regards, Dibyendu On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Question about Spark Streaming Receiver Failure
Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Question about Spark Streaming Receiver Failure
Akhil, I have checked the logs. There isn't any clue as to why the 5 receivers failed. That's why I just take it for granted that it will be a common issue for receiver failures, and we need to figure out a way to detect this kind of failure and do fail-over. Thanks On Mon, Mar 16, 2015 at 3:17 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Question about Spark Streaming Receiver Failure
You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Question about Spark Streaming Receiver Failure
Which version of Spark you are running ? You can try this Low Level Consumer : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer This is designed to recover from various failures and have very good fault recovery mechanism built in. This is being used by many users and at present we at Pearson running this Receiver in Production for almost 3 months without any issue. You can give this a try. Regards, Dibyendu On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Question about Spark Streaming Receiver Failure
Yes.. Auto restart is enabled in my low level consumer ..when there is some unhandled exception comes... Even if you see KafkaConsumer.java, for some cases ( like broker failure, kafka leader changes etc ) it can even refresh the Consumer (The Coordinator which talks to a Leader) which will recover from those failures.. Dib On Mon, Mar 16, 2015 at 1:40 PM, Jun Yang yangjun...@gmail.com wrote: I have checked Dibyendu's code, it looks that his implementation has auto-restart mechanism: src/main/java/consumer/kafka/client/KafkaReceiver.java: private void start() { // Start the thread that receives data over a connection KafkaConfig kafkaConfig = new KafkaConfig(_props); ZkState zkState = new ZkState(kafkaConfig); _kConsumer = new KafkaConsumer(kafkaConfig, zkState, this); _kConsumer.open(_partitionId); Thread.UncaughtExceptionHandler eh = new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread th, Throwable ex) { restart(Restarting Receiver for Partition + _partitionId , ex, 5000); } }; _consumerThread = new Thread(_kConsumer); _consumerThread.setDaemon(true); _consumerThread.setUncaughtExceptionHandler(eh); _consumerThread.start(); } I also checked Spark's native Kafka Receiver implementation, and it looks not have any auto-restart support. Any comments from Dibyendu? On Mon, Mar 16, 2015 at 3:39 PM, Akhil Das ak...@sigmoidanalytics.com wrote: As i seen, once i kill my receiver on one machine, it will automatically spawn another receiver on another machine or on the same machine. Thanks Best Regards On Mon, Mar 16, 2015 at 1:08 PM, Jun Yang yangjun...@gmail.com wrote: Dibyendu, Thanks for the reply. I am reading your project homepage now. One quick question I care about is: If the receivers failed for some reasons(for example, killed brutally by someone else), is there any mechanism for the receiver to fail over automatically? On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Which version of Spark you are running ? You can try this Low Level Consumer : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer This is designed to recover from various failures and have very good fault recovery mechanism built in. This is being used by many users and at present we at Pearson running this Receiver in Production for almost 3 months without any issue. You can give this a try. Regards, Dibyendu On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro -- yangjun...@gmail.com http://hi.baidu.com/yjpro -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Question about Spark Streaming Receiver Failure
I have checked Dibyendu's code, it looks that his implementation has auto-restart mechanism: src/main/java/consumer/kafka/client/KafkaReceiver.java: private void start() { // Start the thread that receives data over a connection KafkaConfig kafkaConfig = new KafkaConfig(_props); ZkState zkState = new ZkState(kafkaConfig); _kConsumer = new KafkaConsumer(kafkaConfig, zkState, this); _kConsumer.open(_partitionId); Thread.UncaughtExceptionHandler eh = new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread th, Throwable ex) { restart(Restarting Receiver for Partition + _partitionId , ex, 5000); } }; _consumerThread = new Thread(_kConsumer); _consumerThread.setDaemon(true); _consumerThread.setUncaughtExceptionHandler(eh); _consumerThread.start(); } I also checked Spark's native Kafka Receiver implementation, and it looks not have any auto-restart support. Any comments from Dibyendu? On Mon, Mar 16, 2015 at 3:39 PM, Akhil Das ak...@sigmoidanalytics.com wrote: As i seen, once i kill my receiver on one machine, it will automatically spawn another receiver on another machine or on the same machine. Thanks Best Regards On Mon, Mar 16, 2015 at 1:08 PM, Jun Yang yangjun...@gmail.com wrote: Dibyendu, Thanks for the reply. I am reading your project homepage now. One quick question I care about is: If the receivers failed for some reasons(for example, killed brutally by someone else), is there any mechanism for the receiver to fail over automatically? On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Which version of Spark you are running ? You can try this Low Level Consumer : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer This is designed to recover from various failures and have very good fault recovery mechanism built in. This is being used by many users and at present we at Pearson running this Receiver in Production for almost 3 months without any issue. You can give this a try. Regards, Dibyendu On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro -- yangjun...@gmail.com http://hi.baidu.com/yjpro -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Re: Question about spark streaming+Flume
Hi Arush, With your code, I still didn't see the output Received X flumes events.. bit1...@163.com From: bit1...@163.com Date: 2015-02-17 14:08 To: Arush Kharbanda CC: user Subject: Re: Re: Question about spark streaming+Flume Ok, you are missing a letter in foreachRDD.. let me proceed.. bit1...@163.com From: Arush Kharbanda Date: 2015-02-17 14:31 To: bit1...@163.com CC: user Subject: Re: Question about spark streaming+Flume Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Question about spark streaming+Flume
Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks
Re: Question about spark streaming+Flume
Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Re: Question about spark streaming+Flume
Ok, you are missing a letter in foreachRDD.. let me proceed.. bit1...@163.com From: Arush Kharbanda Date: 2015-02-17 14:31 To: bit1...@163.com CC: user Subject: Re: Question about spark streaming+Flume Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Re: Question about spark streaming+Flume
Thanks Arush.. With your code, compiling error occurs: Error:(19, 11) value forechRDD is not a member of org.apache.spark.streaming.dstream.ReceiverInputDStream[org.apache.spark.streaming.flume.SparkFlumeEvent] lines.forechRDD(_.foreach(println)) ^ From: Arush Kharbanda Date: 2015-02-17 14:31 To: bit1...@163.com CC: user Subject: Re: Question about spark streaming+Flume Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com