Re: Spark Streaming + Kafka failure recovery
Hi Cody, That is clear. Thanks! Bill On Tue, May 19, 2015 at 1:27 PM, Cody Koeninger c...@koeninger.org wrote: If you checkpoint, the job will start from the successfully consumed offsets. If you don't checkpoint, by default it will start from the highest available offset, and you will potentially lose data. Is the link I posted, or for that matter the scaladoc, really not clear on that point? The scaladoc says: To recover from driver failures, you have to enable checkpointing in the StreamingContext http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/StreamingContext.html. The information on consumed offset can be recovered from the checkpoint. On Tue, May 19, 2015 at 2:38 PM, Bill Jay bill.jaypeter...@gmail.com wrote: If a Spark streaming job stops at 12:01 and I resume the job at 12:02. Will it still start to consume the data that were produced to Kafka at 12:01? Or it will just start consuming from the current time? On Tue, May 19, 2015 at 10:58 AM, Cody Koeninger c...@koeninger.org wrote: Have you read https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ? 1. There's nothing preventing that. 2. Checkpointing will give you at-least-once semantics, provided you have sufficient kafka retention. Be aware that checkpoints aren't recoverable if you upgrade code. On Tue, May 19, 2015 at 12:42 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently using Spark streaming to consume and save logs every hour in our production pipeline. The current setting is to run a crontab job to check every minute whether the job is still there and if not resubmit a Spark streaming job. I am currently using the direct approach for Kafka consumer. I have two questions: 1. In the direct approach, no offset is stored in zookeeper and no group id is specified. Can two consumers (one is Spark streaming and the other is a Kafak console consumer in Kafka package) read from the same topic from the brokers together (I would like both of them to get all messages, i.e. publish-subscribe mode)? What about two Spark streaming jobs read from the same topic? 2. How to avoid data loss if a Spark job is killed? Does checkpointing serve this purpose? The default behavior of Spark streaming is to read the latest logs. However, if a job is killed, can the new job resume from what was left to avoid loosing logs? Thanks! Bill
Spark Streaming + Kafka failure recovery
Hi all, I am currently using Spark streaming to consume and save logs every hour in our production pipeline. The current setting is to run a crontab job to check every minute whether the job is still there and if not resubmit a Spark streaming job. I am currently using the direct approach for Kafka consumer. I have two questions: 1. In the direct approach, no offset is stored in zookeeper and no group id is specified. Can two consumers (one is Spark streaming and the other is a Kafak console consumer in Kafka package) read from the same topic from the brokers together (I would like both of them to get all messages, i.e. publish-subscribe mode)? What about two Spark streaming jobs read from the same topic? 2. How to avoid data loss if a Spark job is killed? Does checkpointing serve this purpose? The default behavior of Spark streaming is to read the latest logs. However, if a job is killed, can the new job resume from what was left to avoid loosing logs? Thanks! Bill
Re: Spark Streaming + Kafka failure recovery
If a Spark streaming job stops at 12:01 and I resume the job at 12:02. Will it still start to consume the data that were produced to Kafka at 12:01? Or it will just start consuming from the current time? On Tue, May 19, 2015 at 10:58 AM, Cody Koeninger c...@koeninger.org wrote: Have you read https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ? 1. There's nothing preventing that. 2. Checkpointing will give you at-least-once semantics, provided you have sufficient kafka retention. Be aware that checkpoints aren't recoverable if you upgrade code. On Tue, May 19, 2015 at 12:42 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently using Spark streaming to consume and save logs every hour in our production pipeline. The current setting is to run a crontab job to check every minute whether the job is still there and if not resubmit a Spark streaming job. I am currently using the direct approach for Kafka consumer. I have two questions: 1. In the direct approach, no offset is stored in zookeeper and no group id is specified. Can two consumers (one is Spark streaming and the other is a Kafak console consumer in Kafka package) read from the same topic from the brokers together (I would like both of them to get all messages, i.e. publish-subscribe mode)? What about two Spark streaming jobs read from the same topic? 2. How to avoid data loss if a Spark job is killed? Does checkpointing serve this purpose? The default behavior of Spark streaming is to read the latest logs. However, if a job is killed, can the new job resume from what was left to avoid loosing logs? Thanks! Bill
Partition number of Spark Streaming Kafka receiver-based approach
Hi all, I am reading the docs of receiver-based Kafka consumer. The last parameters of KafkaUtils.createStream is per topic number of Kafka partitions to consume. My question is, does the number of partitions for topic in this parameter need to match the number of partitions in Kafka. For example, I have two topics, topic1 with 3 partitions and topic2 with 4 partitions. If i specify 2 for topic1 and 3 for topic2 and feed them to the createStream function, will there be data loss? Or it will just be an inefficiency. Thanks! Bill
Re: Too many open files when using Spark to consume messages from Kafka
The data ingestion is in outermost portion in foreachRDD block. Although now I close the statement of jdbc, the same exception happened again. It seems it is not related to the data ingestion part. On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger c...@koeninger.org wrote: Use lsof to see what files are actually being held open. That stacktrace looks to me like it's from the driver, not executors. Where in foreach is it being called? The outermost portion of foreachRDD runs in the driver, the innermost portion runs in the executors. From the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html dstream.foreachRDD { rdd = val connection = createNewConnection() // executed at the driver rdd.foreach { record = connection.send(record) // executed at the worker }} @td I've specifically looked at kafka socket connections for the standard 1.3 code vs my branch that has cached connections. The standard non-caching code has very short lived connections. I've had jobs running for a month at a time, including ones writing to mysql. Not saying it's impossible, but I'd think we need some evidence before speculating this has anything to do with it. On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay bill.jaypeter...@gmail.com wrote: This function is called in foreachRDD. I think it should be running in the executors. I add the statement.close() in the code and it is running. I will let you know if this fixes the issue. On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote: Is the function ingestToMysql running on the driver or on the executors? Accordingly you can try debugging while running in a distributed manner, with and without calling the function. If you dont get too many open files without calling ingestToMysql(), the problem is likely to be in ingestToMysql(). If you get the problem even without calling ingestToMysql(), then the problem may be in Kafka. If the problem is occuring in the driver, then its the DirecKafkaInputDStream code. If the problem is occurring in the executor, then the problem is in KafkaRDD. TD On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote: Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue
Re: Too many open files when using Spark to consume messages from Kafka
I terminated the old job and now start a new one. Currently, the Spark streaming job has been running for 2 hours and when I use lsof, I do not see many files related to the Spark job. BTW, I am running Spark streaming using local[2] mode. The batch is 5 seconds and it has around 50 lines to read each batch. On Thu, Apr 30, 2015 at 11:15 AM, Cody Koeninger c...@koeninger.org wrote: Did you use lsof to see what files were opened during the job? On Thu, Apr 30, 2015 at 1:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: The data ingestion is in outermost portion in foreachRDD block. Although now I close the statement of jdbc, the same exception happened again. It seems it is not related to the data ingestion part. On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger c...@koeninger.org wrote: Use lsof to see what files are actually being held open. That stacktrace looks to me like it's from the driver, not executors. Where in foreach is it being called? The outermost portion of foreachRDD runs in the driver, the innermost portion runs in the executors. From the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html dstream.foreachRDD { rdd = val connection = createNewConnection() // executed at the driver rdd.foreach { record = connection.send(record) // executed at the worker }} @td I've specifically looked at kafka socket connections for the standard 1.3 code vs my branch that has cached connections. The standard non-caching code has very short lived connections. I've had jobs running for a month at a time, including ones writing to mysql. Not saying it's impossible, but I'd think we need some evidence before speculating this has anything to do with it. On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay bill.jaypeter...@gmail.com wrote: This function is called in foreachRDD. I think it should be running in the executors. I add the statement.close() in the code and it is running. I will let you know if this fixes the issue. On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote: Is the function ingestToMysql running on the driver or on the executors? Accordingly you can try debugging while running in a distributed manner, with and without calling the function. If you dont get too many open files without calling ingestToMysql(), the problem is likely to be in ingestToMysql(). If you get the problem even without calling ingestToMysql(), then the problem may be in Kafka. If the problem is occuring in the driver, then its the DirecKafkaInputDStream code. If the problem is occurring in the executor, then the problem is in KafkaRDD. TD On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote: Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too
Re: Too many open files when using Spark to consume messages from Kafka
Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream
Too many open files when using Spark to consume messages from Kafka
Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at
Re: Too many open files when using Spark to consume messages from Kafka
This function is called in foreachRDD. I think it should be running in the executors. I add the statement.close() in the code and it is running. I will let you know if this fixes the issue. On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote: Is the function ingestToMysql running on the driver or on the executors? Accordingly you can try debugging while running in a distributed manner, with and without calling the function. If you dont get too many open files without calling ingestToMysql(), the problem is likely to be in ingestToMysql(). If you get the problem even without calling ingestToMysql(), then the problem may be in Kafka. If the problem is occuring in the driver, then its the DirecKafkaInputDStream code. If the problem is occurring in the executor, then the problem is in KafkaRDD. TD On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote: Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1
Re: Lifecycle of RDD in spark-streaming
Gerard, That is a good observation. However, the strange thing I meet is if I use MEMORY_AND_DISK_SER, the job even fails earlier. In my case, it takes 10 seconds to process my data of every batch, which is one minute. It fails after 10 hours with the cannot compute split error. Bill On Thu, Nov 27, 2014 at 3:31 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi TD, We also struggled with this error for a long while. The recurring scenario is when the job takes longer to compute than the job interval and a backlog starts to pile up. Hint: Check If the DStream storage level is set to MEMORY_ONLY_SER and memory runs out, then you will get a 'Cannot compute split: Missing block ...'. What I don't know ATM is whether the new data is dropped or the LRU policy removes data in the system in favor for the incoming data. In any case, the DStream processing still thinks the data is there at the moment the job is scheduled to run and fails to run. In our case, changing storage to MEMORY_AND_DISK_SER solved the problem and our streaming job can get through tought times without issues. Regularly checking 'scheduling delay' and 'total delay' on the Streaming tab in the UI is a must. (And soon we will have that on the metrics report as well!! :-) ) -kr, Gerard. On Thu, Nov 27, 2014 at 8:14 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi TD, I am using Spark Streaming to consume data from Kafka and do some aggregation and ingest the results into RDS. I do use foreachRDD in the program. I am planning to use Spark streaming in our production pipeline and it performs well in generating the results. Unfortunately, we plan to have a production pipeline 24/7 and Spark streaming job usually fails after 8-20 hours due to the exception cannot compute split. In other cases, the Kafka receiver has failure and the program runs without producing any result. In my pipeline, the batch size is 1 minute and the data volume per minute from Kafka is 3G. I have been struggling with this issue for more than a month. It will be great if you can provide some solutions for this. Thanks! Bill On Wed, Nov 26, 2014 at 5:35 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you elaborate on the usage pattern that lead to cannot compute split ? Are you using the RDDs generated by DStream, outside the DStream logic? Something like running interactive Spark jobs (independent of the Spark Streaming ones) on RDDs generated by DStreams? If that is the case, what is happening is that Spark Streaming is not aware that some of the RDDs (and the raw input data that it will need) will be used by Spark jobs unrelated to Spark Streaming. Hence Spark Streaming will actively clear off the raw data, leading to failures in the unrelated Spark jobs using that data. In case this is your use case, the cleanest way to solve this, is by asking Spark Streaming remember stuff for longer, by using streamingContext.remember(duration). This will ensure that Spark Streaming will keep around all the stuff for at least that duration. Hope this helps. TD On Wed, Nov 26, 2014 at 5:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Just add one more point. If Spark streaming knows when the RDD will not be used any more, I believe Spark will not try to retrieve data it will not use any more. However, in practice, I often encounter the error of cannot compute split. Based on my understanding, this is because Spark cleared out data that will be used again. In my case, the data volume is much smaller (30M/s, the batch size is 60 seconds) than the memory (20G each executor). If Spark will only keep RDD that are in use, I expect that this error may not happen. Bill On Wed, Nov 26, 2014 at 4:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Let me further clarify Lalit's point on when RDDs generated by DStreams are destroyed, and hopefully that will answer your original questions. 1. How spark (streaming) guarantees that all the actions are taken on each input rdd/batch. This is isnt hard! By the time you call streamingContext.start(), you have already set up the output operations (foreachRDD, saveAs***Files, etc.) that you want to do with the DStream. There are RDD actions inside the DStream output oeprations that need to be done every batch interval. So all the systems does is this - after every batch interval, put all the output operations (that will call RDD actions) in a job queue, and then keep executing stuff in the queue. If there is any failure in running the jobs, the streaming context will stop. 2. How does spark determines that the life-cycle of a rdd is complete. Is there any chance that a RDD will be cleaned out of ram before all actions are taken on them? Spark Streaming knows when the all the processing related to batch T has been completed. And also it keeps track of how much time of the previous RDDs does
Re: Lifecycle of RDD in spark-streaming
Just add one more point. If Spark streaming knows when the RDD will not be used any more, I believe Spark will not try to retrieve data it will not use any more. However, in practice, I often encounter the error of cannot compute split. Based on my understanding, this is because Spark cleared out data that will be used again. In my case, the data volume is much smaller (30M/s, the batch size is 60 seconds) than the memory (20G each executor). If Spark will only keep RDD that are in use, I expect that this error may not happen. Bill On Wed, Nov 26, 2014 at 4:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Let me further clarify Lalit's point on when RDDs generated by DStreams are destroyed, and hopefully that will answer your original questions. 1. How spark (streaming) guarantees that all the actions are taken on each input rdd/batch. This is isnt hard! By the time you call streamingContext.start(), you have already set up the output operations (foreachRDD, saveAs***Files, etc.) that you want to do with the DStream. There are RDD actions inside the DStream output oeprations that need to be done every batch interval. So all the systems does is this - after every batch interval, put all the output operations (that will call RDD actions) in a job queue, and then keep executing stuff in the queue. If there is any failure in running the jobs, the streaming context will stop. 2. How does spark determines that the life-cycle of a rdd is complete. Is there any chance that a RDD will be cleaned out of ram before all actions are taken on them? Spark Streaming knows when the all the processing related to batch T has been completed. And also it keeps track of how much time of the previous RDDs does it need to remember and keep around in the cache based on what DStream operations have been done. For example, if you are using a window 1 minute, the system knows that it needs to keep around at least last 1 minute data in the memory. Accordingly, it cleans up the input data (actively unpersisted), and cached RDD (simply dereferenced from DStream metadata, and then Spark unpersists them as the RDD object gets GarbageCollected by the JVM). TD On Wed, Nov 26, 2014 at 10:10 AM, tian zhang tzhang...@yahoo.com.invalid wrote: I have found this paper seems to answer most of questions about life duration. https://www.cs.berkeley.edu/~matei/papers/2012/hotcloud_spark_streaming.pdf Tian On Tuesday, November 25, 2014 4:02 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hey Experts, I wanted to understand in detail about the lifecycle of rdd(s) in a streaming app. From my current understanding - rdd gets created out of the realtime input stream. - Transform(s) functions are applied in a lazy fashion on the RDD to transform into another rdd(s). - Actions are taken on the final transformed rdds to get the data out of the system. Also rdd(s) are stored in the clusters RAM (disc if configured so) and are cleaned in LRU fashion. So I have the following questions on the same. - How spark (streaming) guarantees that all the actions are taken on each input rdd/batch. - How does spark determines that the life-cycle of a rdd is complete. Is there any chance that a RDD will be cleaned out of ram before all actions are taken on them? Thanks in advance for all your help. Also, I'm relatively new to scala spark so pardon me in case these are naive questions/assumptions. -- Thanks Regards, Mukesh Jha - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Lifecycle of RDD in spark-streaming
Hi TD, I am using Spark Streaming to consume data from Kafka and do some aggregation and ingest the results into RDS. I do use foreachRDD in the program. I am planning to use Spark streaming in our production pipeline and it performs well in generating the results. Unfortunately, we plan to have a production pipeline 24/7 and Spark streaming job usually fails after 8-20 hours due to the exception cannot compute split. In other cases, the Kafka receiver has failure and the program runs without producing any result. In my pipeline, the batch size is 1 minute and the data volume per minute from Kafka is 3G. I have been struggling with this issue for more than a month. It will be great if you can provide some solutions for this. Thanks! Bill On Wed, Nov 26, 2014 at 5:35 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you elaborate on the usage pattern that lead to cannot compute split ? Are you using the RDDs generated by DStream, outside the DStream logic? Something like running interactive Spark jobs (independent of the Spark Streaming ones) on RDDs generated by DStreams? If that is the case, what is happening is that Spark Streaming is not aware that some of the RDDs (and the raw input data that it will need) will be used by Spark jobs unrelated to Spark Streaming. Hence Spark Streaming will actively clear off the raw data, leading to failures in the unrelated Spark jobs using that data. In case this is your use case, the cleanest way to solve this, is by asking Spark Streaming remember stuff for longer, by using streamingContext.remember(duration). This will ensure that Spark Streaming will keep around all the stuff for at least that duration. Hope this helps. TD On Wed, Nov 26, 2014 at 5:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Just add one more point. If Spark streaming knows when the RDD will not be used any more, I believe Spark will not try to retrieve data it will not use any more. However, in practice, I often encounter the error of cannot compute split. Based on my understanding, this is because Spark cleared out data that will be used again. In my case, the data volume is much smaller (30M/s, the batch size is 60 seconds) than the memory (20G each executor). If Spark will only keep RDD that are in use, I expect that this error may not happen. Bill On Wed, Nov 26, 2014 at 4:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Let me further clarify Lalit's point on when RDDs generated by DStreams are destroyed, and hopefully that will answer your original questions. 1. How spark (streaming) guarantees that all the actions are taken on each input rdd/batch. This is isnt hard! By the time you call streamingContext.start(), you have already set up the output operations (foreachRDD, saveAs***Files, etc.) that you want to do with the DStream. There are RDD actions inside the DStream output oeprations that need to be done every batch interval. So all the systems does is this - after every batch interval, put all the output operations (that will call RDD actions) in a job queue, and then keep executing stuff in the queue. If there is any failure in running the jobs, the streaming context will stop. 2. How does spark determines that the life-cycle of a rdd is complete. Is there any chance that a RDD will be cleaned out of ram before all actions are taken on them? Spark Streaming knows when the all the processing related to batch T has been completed. And also it keeps track of how much time of the previous RDDs does it need to remember and keep around in the cache based on what DStream operations have been done. For example, if you are using a window 1 minute, the system knows that it needs to keep around at least last 1 minute data in the memory. Accordingly, it cleans up the input data (actively unpersisted), and cached RDD (simply dereferenced from DStream metadata, and then Spark unpersists them as the RDD object gets GarbageCollected by the JVM). TD On Wed, Nov 26, 2014 at 10:10 AM, tian zhang tzhang...@yahoo.com.invalid wrote: I have found this paper seems to answer most of questions about life duration. https://www.cs.berkeley.edu/~matei/papers/2012/hotcloud_spark_streaming.pdf Tian On Tuesday, November 25, 2014 4:02 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hey Experts, I wanted to understand in detail about the lifecycle of rdd(s) in a streaming app. From my current understanding - rdd gets created out of the realtime input stream. - Transform(s) functions are applied in a lazy fashion on the RDD to transform into another rdd(s). - Actions are taken on the final transformed rdds to get the data out of the system. Also rdd(s) are stored in the clusters RAM (disc if configured so) and are cleaned in LRU fashion. So I have the following questions
Re: Error when Spark streaming consumes from Kafka
Hi Dibyendu, Thank you for answer. I will try the Spark-Kafka consumer. Bill On Sat, Nov 22, 2014 at 9:15 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: I believe this is something to do with how Kafka High Level API manages consumers within a Consumer group and how it re-balance during failure. You can find some mention in this Kafka wiki. https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design Due to various issues in Kafka High Level APIs, Kafka is moving the High Level Consumer API to a complete new set of API in Kafka 0.9. Other than this co-ordination issue, High Level consumer also has data loss issues. You can probably try this Spark-Kafka consumer which uses Low Level Simple consumer API which is more performant and have no data loss scenarios. https://github.com/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Sun, Nov 23, 2014 at 2:13 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using Spark to consume from Kafka. However, after the job has run for several hours, I saw the following failure of an executor: kafka.common.ConsumerRebalanceFailedException: group-1416624735998_ip-172-31-5-242.ec2.internal-1416648124230-547d2c31 can't rebalance after 4 retries kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212) kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138) org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114) org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Does anyone know the reason for this exception? Thanks! Bill
Error when Spark streaming consumes from Kafka
Hi all, I am using Spark to consume from Kafka. However, after the job has run for several hours, I saw the following failure of an executor: kafka.common.ConsumerRebalanceFailedException: group-1416624735998_ip-172-31-5-242.ec2.internal-1416648124230-547d2c31 can't rebalance after 4 retries kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212) kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138) org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114) org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Does anyone know the reason for this exception? Thanks! Bill
Re: Spark streaming cannot receive any message from Kafka
Hi Jerry, I looked at KafkaUtils.createStream api and found actually the spark.default.parallelism is specified in SparkConf instead. I do not remember the exact stacks of the exception. But the exception was incurred when createStream was called if we do not specify the spark.default.parallelism. The error message basically shows parsing an empty string into Int if spark.default.parallelism is not specified. Bill On Mon, Nov 17, 2014 at 4:45 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Bill, Would you mind describing what you found a little more specifically, I’m not sure there’s the a parameter in KafkaUtils.createStream you can specify the spark parallelism, also what is the exception stacks. Thanks Jerry *From:* Bill Jay [mailto:bill.jaypeter...@gmail.com] *Sent:* Tuesday, November 18, 2014 2:47 AM *To:* Helena Edelson *Cc:* Jay Vyas; u...@spark.incubator.apache.org; Tobias Pfeiffer; Shao, Saisai *Subject:* Re: Spark streaming cannot receive any message from Kafka Hi all, I find the reason of this issue. It seems in the new version, if I do not specify spark.default.parallelism in KafkaUtils.createstream, there will be an exception since the kakfa stream creation stage. In the previous versions, it seems Spark will use the default value. Thanks! Bill On Thu, Nov 13, 2014 at 5:00 AM, Helena Edelson helena.edel...@datastax.com wrote: I encounter no issues with streaming from kafka to spark in 1.1.0. Do you perhaps have a version conflict? Helena On Nov 13, 2014 12:55 AM, Jay Vyas jayunit100.apa...@gmail.com wrote: Yup , very important that n1 for spark streaming jobs, If local use local[2] The thing to remember is that your spark receiver will take a thread to itself and produce data , so u need another thread to consume it . In a cluster manager like yarn or mesos, the word thread Is not used anymore, I guess has different meaning- you need 2 or more free compute slots, and that should be guaranteed by looking to see how many free node managers are running etc. On Nov 12, 2014, at 7:53 PM, Shao, Saisai saisai.s...@intel.com wrote: Did you configure Spark master as local, it should be local[n], n 1 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming example, you can try that. I’ve tested with latest master, it’s OK. Thanks Jerry *From:* Tobias Pfeiffer [mailto:t...@preferred.jp t...@preferred.jp] *Sent:* Thursday, November 13, 2014 8:45 AM *To:* Bill Jay *Cc:* u...@spark.incubator.apache.org *Subject:* Re: Spark streaming cannot receive any message from Kafka Bill, However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Do you see any suspicious messages in the log output? Tobias
Spark streaming: java.io.IOException: Version Mismatch (Expected: 28, Received: 18245 )
Hi all, I am running a Spark Streaming job. It was able to produce the correct results up to some time. Later on, the job was still running but producing no result. I checked the Spark streaming UI and found that 4 tasks of a stage failed. The error messages showed that Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 400048, ip-172-31-13-130.ec2.internal): ExecutorLostFailure (executor lost) Driver stacktrace: I further clicked the stage and found 4 executors running the stages had the error message: ExecutorLostFailure (executor lost) The stage that failed was actually runJob at ReceiverTracker.scala:275 http://ec2-54-172-118-237.compute-1.amazonaws.com:9046/proxy/application_1415902783817_0019/stages/stage?id=2attempt=0, which is the stage that keeps receiving message from Kafka. I guess that is why the job does not produce results any more. To investigate it, I logged into one of the executor machine and checked the hadoop log. The log file contains a lot of exception message: *java.io.IOException: Version Mismatch (Expected: 28, Received: 18245 )* This streaming job is reading from Kafka and producing aggregation results. After this stage failure, the job is still running but there is no data shuffle as seen in the Spark UI. The amount of the time this job can run correctly varies from job to job. Does anyone has an idea why this Spark Streaming job had this exception? And why it cannot recover from the stage failure? Thanks! Bill
Re: Spark streaming cannot receive any message from Kafka
Hi all, I find the reason of this issue. It seems in the new version, if I do not specify spark.default.parallelism in KafkaUtils.createstream, there will be an exception since the kakfa stream creation stage. In the previous versions, it seems Spark will use the default value. Thanks! Bill On Thu, Nov 13, 2014 at 5:00 AM, Helena Edelson helena.edel...@datastax.com wrote: I encounter no issues with streaming from kafka to spark in 1.1.0. Do you perhaps have a version conflict? Helena On Nov 13, 2014 12:55 AM, Jay Vyas jayunit100.apa...@gmail.com wrote: Yup , very important that n1 for spark streaming jobs, If local use local[2] The thing to remember is that your spark receiver will take a thread to itself and produce data , so u need another thread to consume it . In a cluster manager like yarn or mesos, the word thread Is not used anymore, I guess has different meaning- you need 2 or more free compute slots, and that should be guaranteed by looking to see how many free node managers are running etc. On Nov 12, 2014, at 7:53 PM, Shao, Saisai saisai.s...@intel.com wrote: Did you configure Spark master as local, it should be local[n], n 1 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming example, you can try that. I’ve tested with latest master, it’s OK. Thanks Jerry *From:* Tobias Pfeiffer [mailto:t...@preferred.jp t...@preferred.jp] *Sent:* Thursday, November 13, 2014 8:45 AM *To:* Bill Jay *Cc:* u...@spark.incubator.apache.org *Subject:* Re: Spark streaming cannot receive any message from Kafka Bill, However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Do you see any suspicious messages in the log output? Tobias
Spark streaming cannot receive any message from Kafka
Hi all, I have a Spark streaming job which constantly receives messages from Kafka. I was using Spark 1.0.2 and the job has been running for a month. However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Below please find the code snippet for the Kafkaconsumer: var Array(zkQuorum, topics, mysqlTable) = args val currentTime: Long = System.currentTimeMillis val group = my-group- + currentTime.toString println(topics) println(zkQuorum) val numThreads = 1 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val lineCounts = lines.count.saveAsTextFiles(hadoopOutput+counts/result) I checked the values in topics and zkQuorum and they are correct. I use the same information with kafka-console-consumer and it works correctly. Does anyone know the reason? Thanks! Bill
Spark streaming cannot receive any message from Kafka
Hi all, I have a Spark streaming job which constantly receives messages from Kafka. I was using Spark 1.0.2 and the job has been running for a month. However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Below please find the code snippet for the Kafka consumer: var Array(zkQuorum, topics, mysqlTable) = args val currentTime: Long = System.currentTimeMillis val group = my-group- + currentTime.toString println(topics) println(zkQuorum) val numThreads = 1 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val lineCounts = lines.count.saveAsTextFiles(hadoopOutput+counts/result) I checked the values in topics and zkQuorum and they are correct. I use the same information with kafka-console-consumer and it works correctly. Does anyone know the reason? Thanks! Bill
Re: Spark streaming cannot receive any message from Kafka
Hi all, Thanks for the information. I am running Spark streaming in a yarn cluster and the configuration should be correct. I followed the KafkaWordCount to write the current code three months ago. It has been working for several months. The messages are in json format. Actually, this code worked a few days ago. But now it is not working. Below please find my spark submit script: SPARK_BIN=/home/hadoop/spark/bin/ $SPARK_BIN/spark-submit \ --class com.test \ --master yarn-cluster \ --deploy-mode cluster \ --verbose \ --driver-memory 20G \ --executor-memory 20G \ --executor-cores 6 \ --num-executors $2 \ $1 $3 $4 $5 Thanks! Bill On Wed, Nov 12, 2014 at 4:53 PM, Shao, Saisai saisai.s...@intel.com wrote: Did you configure Spark master as local, it should be local[n], n 1 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming example, you can try that. I’ve tested with latest master, it’s OK. Thanks Jerry *From:* Tobias Pfeiffer [mailto:t...@preferred.jp] *Sent:* Thursday, November 13, 2014 8:45 AM *To:* Bill Jay *Cc:* u...@spark.incubator.apache.org *Subject:* Re: Spark streaming cannot receive any message from Kafka Bill, However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Do you see any suspicious messages in the log output? Tobias
Spark streaming job failed due to java.util.concurrent.TimeoutException
Hi all, I have a spark streaming job that consumes data from Kafka and produces some simple operations on the data. This job is run in an EMR cluster with 10 nodes. The batch size I use is 1 minute and it takes around 10 seconds to generate the results that are inserted to a MySQL database. However, after more than 2 days, the job failed with a list of the following error information in the log: jjava.util.concurrent.TimeoutException: Futures timed out after [30 seconds] java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] Does anyone know the reason? Thanks! Bill
Re: combineByKey at ShuffledDStream.scala
The streaming program contains the following main stages: 1. receive data from Kafka 2. preprocessing of the data. These are all map and filtering stages. 3. Group by a field 4. Process the groupBy results using map. Inside this processing, I use collect, count. Thanks! Bill On Tue, Jul 22, 2014 at 10:05 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you give an idea of the streaming program? Rest of the transformation you are doing on the input streams? On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently running a Spark Streaming program, which consumes data from Kakfa and does the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always takes the longest running time. And If I open this stage, I only see two executors on this stage. Does anyone has an idea what this stage does and how to increase the speed for this stage? Thanks! Bill
Re: Spark Streaming: no job has started yet
The code is pretty long. But the main idea is to consume from Kafka, preprocess the data, and groupBy a field. I use mutliple DStream to add parallelism to the consumer. It seems when the number of DStreams is large, this happens often. Thanks, Bill On Tue, Jul 22, 2014 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste the piece of code? Thanks Best Regards On Wed, Jul 23, 2014 at 1:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am running a spark streaming job. The job hangs on one stage, which shows as follows: Details for Stage 4 Summary Metrics No tasks have started yetTasksNo tasks have started yet Does anyone have an idea on this? Thanks! Bill Bill
Get Spark Streaming timestamp
Hi all, I have a question regarding Spark streaming. When we use the saveAsTextFiles function and my batch is 60 seconds, Spark will generate a series of files such as: result-140614896, result-140614802, result-140614808, etc. I think this is the timestamp for the beginning of each batch. How can we extract the variable and use it in our code? Thanks! Bill
Re: Get Spark Streaming timestamp
Hi Tobias, It seems this parameter is an input to the function. What I am expecting is output from a function that tells me the starting or ending time of the batch. For instance, If I use saveAsTextFiles, it seems DStream will generate a batch every minute and the starting time is a complete minute (batch size is 60 seconds). Thanks! Bill On Wed, Jul 23, 2014 at 6:56 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, Spark Streaming's DStream provides overloaded methods for transform() and foreachRDD() that allow you to access the timestamp of a batch: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream I think the timestamp is the end of the batch, not the beginning. For example, I compute runtime taking the difference between now() and the time I get as a parameter in foreachRDD(). Tobias On Thu, Jul 24, 2014 at 6:39 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a question regarding Spark streaming. When we use the saveAsTextFiles function and my batch is 60 seconds, Spark will generate a series of files such as: result-140614896, result-140614802, result-140614808, etc. I think this is the timestamp for the beginning of each batch. How can we extract the variable and use it in our code? Thanks! Bill
Re: spark streaming rate limiting from kafka
Hi Tobias, I tried to use 10 as numPartition. The number of executors allocated is the number of DStream. Therefore, it seems the parameter does not spread data into many partitions. In order to to that, it seems we have to do repartition. If numPartitions will distribute the data to multiple executors/partitions, then I will be able to save the running time incurred by repartition. Bill On Mon, Jul 21, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, numPartitions means the number of Spark partitions that the data received from Kafka will be split to. It has nothing to do with Kafka partitions, as far as I know. If you create multiple Kafka consumers, it doesn't seem like you can specify which consumer will consume which Kafka partitions. Instead, Kafka (at least with the interface that is exposed by the Spark Streaming API) will do something called rebalance and assign Kafka partitions to consumers evenly, you can see this in the client logs. When using multiple Kafka consumers with auto.offset.reset = true, please expect to run into this one: https://issues.apache.org/jira/browse/SPARK-2383 Tobias On Tue, Jul 22, 2014 at 3:40 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I am currentlycreating multiple DStream to consumefrom different topics. How can I let each consumer consume from different partitions. I find the following parameters from Spark API: createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc: JavaStreamingContext https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class [U], valueDecoderClass: Class[T], kafkaParams: Map[String, String], topics: Map[String, Integer],storageLevel: StorageLevel https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html ): JavaPairReceiverInputDStream https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html [K, V] Create an input stream that pulls messages form a Kafka Broker. The topics parameter is: *Map of (topic_name - numPartitions) to consume. Each partition is consumed in its own thread* Does numPartitions mean the total number of partitions to consume from topic_name or the index of the partition? How can we specify for each createStream which partition of the Kafka topic to consume? I think if so, I will get a lot of parallelism from the source of the data. Thanks! Bill On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: You can create multiple kafka stream to partition your topics across them, which will run multiple receivers or multiple executors. This is covered in the Spark streaming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving And for the purpose of this thread, to answer the original question, we now have the ability https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC to limit the receiving rate. Its in the master branch, and will be available in Spark 1.1. It basically sets the limits at the receiver level (so applies to all sources) on what is the max records per second that can will be received by the receiver. TD On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
combineByKey at ShuffledDStream.scala
Hi all, I am currently running a Spark Streaming program, which consumes data from Kakfa and does the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always takes the longest running time. And If I open this stage, I only see two executors on this stage. Does anyone has an idea what this stage does and how to increase the speed for this stage? Thanks! Bill
Spark Streaming: no job has started yet
Hi all, I am running a spark streaming job. The job hangs on one stage, which shows as follows: Details for Stage 4 Summary MetricsNo tasks have started yetTasksNo tasks have started yet Does anyone have an idea on this? Thanks! Bill Bill
Re: spark streaming rate limiting from kafka
Hi Tathagata, I am currentlycreating multiple DStream to consumefrom different topics. How can I let each consumer consume from different partitions. I find the following parameters from Spark API: createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc: JavaStreamingContext https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class[U] , valueDecoderClass: Class[T], kafkaParams: Map[String, String], topics: Map [String, Integer],storageLevel: StorageLevel https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html ): JavaPairReceiverInputDStream https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html [K, V] Create an input stream that pulls messages form a Kafka Broker. The topics parameter is: *Map of (topic_name - numPartitions) to consume. Each partition is consumed in its own thread* Does numPartitions mean the total number of partitions to consume from topic_name or the index of the partition? How can we specify for each createStream which partition of the Kafka topic to consume? I think if so, I will get a lot of parallelism from the source of the data. Thanks! Bill On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: You can create multiple kafka stream to partition your topics across them, which will run multiple receivers or multiple executors. This is covered in the Spark streaming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving And for the purpose of this thread, to answer the original question, we now have the ability https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC to limit the receiving rate. Its in the master branch, and will be available in Spark 1.1. It basically sets the limits at the receiver level (so applies to all sources) on what is the max records per second that can will be received by the receiver. TD On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: spark streaming rate limiting from kafka
Hi Tobias, It seems that repartition can create more executors for the stages following data receiving. However, the number of executors is still far less than what I require (I specify one core for each executor). Based on the index of the executors in the stage, I find many numbers are missing in between. For example, if I repartition(100), the index of executors may be 1, 3, 5, 10, etc. Finally, there may be 45 executors although I request 100 partitions. Bill On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: Spark Streaming timestamps
Hi Tathagata, On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The RDD parameter in foreachRDD contains raw/transformed data from the last batch. So when forearchRDD is called with the time parameter as 5:02:01 and batch size is 1 minute, then the rdd will contain data based on the data received by between 5:02:00 and 5:02:01. Do you mean the data between 5:02:02 and 5:02:01? The time parameter is 5:02:01. Moreover, when the program is running, it is very difficult to specify a starting time because sometimes it is difficult to know when the program executes that line. And do we need a different time parameter for each foreachRDD or Spark will calculate the next one according to batch. If you want to do custom intervals, then I suggest the following 1. Do 1 second batch intervals 2. Then in the foreachRDD, from 5:02:30 to 5:03:28, put all the RDDs in a ArrayBuffer/ListBuffer 3. At 5:03:29, add the RDD to the buffer, and do a union of all the buffered RDDs, and process them. So in foreachRDD, based on the time, buffer the RDDs, until you reach the appropriate time. Then union all the buffered RDDs and process them. TD On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Thanks for your answer. Please see my further question below: On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Answers inline. On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. I have several questions on Spark Streaming's timestamp: 1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated? File named 5:00:01 contains results from data received between 5:00:00 and 5:00:01 (based on system time of the cluster). 2) If I do not use saveAsTextFiles, how do I get the exact time interval of the RDD when I use foreachRDD to do custom output of the results? There is a version of foreachRDD which allows you specify the function that takes in Time object. 3) How can we specify the starting time of the batches? What do you mean? Batches are timed based on the system time of the cluster. I would like to control the starting time and ending time of each batch. For example, if I use saveAsTextFiles as output method and the batch size is 1 minute, Spark will align time intervals to complete minutes, such as 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03, 5:02:03, 5:03:03, etc. My goal is to generate output for a customized interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. I checked the api of foreachRDD with time parameter. It seems there is not explanation on what does that parameter mean. Does it mean the starting time of the first batch? Thanks! Bill
Re: Error: No space left on device
Hi, I also have some issues with repartition. In my program, I consume data from Kafka. After I consume data, I use repartition(N). However, although I set N to be 120, there are around 18 executors allocated for my reduce stage. I am not sure how the repartition command works ton ensure the parallelism. Bill On Thu, Jul 17, 2014 at 12:56 AM, Xiangrui Meng men...@gmail.com wrote: Set N be the total number of cores on the cluster or less. sc.textFile doesn't always give you that number, depends on the block size. For MovieLens, I think the default behavior should be 2~3 partitions. You need to call repartition to ensure the right number of partitions. Which EC2 instance type did you use? I usually use m3.2xlarge or c? instances that come with SSD and 1G or 10G network. For those instances, you should see local drives mounted at /mnt, /mnt2, /mnt3, ... Make sure there was no error when you used the ec2 script to launch the cluster. It is a little strange to see 94% of / was used on a slave. Maybe shuffle data went to /. I'm not sure which settings went wrong. I recommend trying re-launching a cluster with m3.2xlarge instances and using the default settings (don't set anything in SparkConf). Submit the application with --driver-memory 20g. The running times are slower than what I remember, but it depends on the instance type. Best, Xiangrui On Wed, Jul 16, 2014 at 10:18 PM, Chris DuBois chris.dub...@gmail.com wrote: Hi Xiangrui, I will try this shortly. When using N partitions, do you recommend N be the number of cores on each slave or the number of cores on the master? Forgive my ignorance, but is this best achieved as an argument to sc.textFile? The slaves on the EC2 clusters start with only 8gb of storage, and it doesn't seem that /mnt/spark and /mnt2/spark are mounted anywhere else by default. Looking at spark-ec2/setup-slaves.sh, it appears that these are only mounted if the instance type begins with r3. (Or am I not reading that right?) My slaves are a different instance type, and currently look like this: FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 7.3G 515M 94% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdv 500G 2.5G 498G 1% /vol I have been able to finish ALS on MovieLens 10M only twice, taking 221s and 315s for 20 iterations at K=20 and lambda=0.01. Does that timing sound about right, or does it point to a poor configuration? The same script with MovieLens 1M runs fine in about 30-40s with the same settings. (In both cases I'm training on 70% of the data.) Thanks for your help! Chris On Wed, Jul 16, 2014 at 4:29 PM, Xiangrui Meng men...@gmail.com wrote: For ALS, I would recommend repartitioning the ratings to match the number of CPU cores or even less. ALS is not computation heavy for small k but communication heavy. Having small number of partitions may help. For EC2 clusters, we use /mnt/spark and /mnt2/spark as the default local directory because they are local hard drives. Did your last run of ALS on MovieLens 10M-100K with the default settings succeed? -Xiangrui On Wed, Jul 16, 2014 at 8:00 AM, Chris DuBois chris.dub...@gmail.com wrote: Hi Xiangrui, I accidentally did not send df -i for the master node. Here it is at the moment of failure: FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 280938 243350 54% / tmpfs3845409 1 38454081% /dev/shm /dev/xvdb100024321027 100014051% /mnt /dev/xvdf10002432 16 100024161% /mnt2 /dev/xvdv524288000 13 5242879871% /vol I am using default settings now, but is there a way to make sure that the proper directories are being used? How many blocks/partitions do you recommend? Chris On Wed, Jul 16, 2014 at 1:09 AM, Chris DuBois chris.dub...@gmail.com wrote: Hi Xiangrui, Here is the result on the master node: $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 273997 250291 53% / tmpfs1917974 1 19179731% /dev/shm /dev/xvdv524288000 30 5242879701% /vol I have reproduced the error while using the MovieLens 10M data set on a newly created cluster. Thanks for the help. Chris On Wed, Jul 16, 2014 at 12:22 AM, Xiangrui Meng men...@gmail.com wrote: Hi Chris, Could you also try `df -i` on the master node? How many blocks/partitions did you set? In the current implementation, ALS doesn't clean the shuffle data because the operations are chained together. But it shouldn't run out of disk space on the MovieLens dataset, which is small. spark-ec2 script sets /mnt/spark and /mnt/spark2 as the local.dir by
Re: Spark Streaming timestamps
Hi Tathagata, Thanks for your answer. Please see my further question below: On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Answers inline. On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. I have several questions on Spark Streaming's timestamp: 1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated? File named 5:00:01 contains results from data received between 5:00:00 and 5:00:01 (based on system time of the cluster). 2) If I do not use saveAsTextFiles, how do I get the exact time interval of the RDD when I use foreachRDD to do custom output of the results? There is a version of foreachRDD which allows you specify the function that takes in Time object. 3) How can we specify the starting time of the batches? What do you mean? Batches are timed based on the system time of the cluster. I would like to control the starting time and ending time of each batch. For example, if I use saveAsTextFiles as output method and the batch size is 1 minute, Spark will align time intervals to complete minutes, such as 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03, 5:02:03, 5:03:03, etc. My goal is to generate output for a customized interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. I checked the api of foreachRDD with time parameter. It seems there is not explanation on what does that parameter mean. Does it mean the starting time of the first batch? Thanks! Bill
Re: spark streaming rate limiting from kafka
I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: Number of executors change during job running
Hi Tathagata, I have tried the repartition method. The reduce stage first had 2 executors and then it had around 85 executors. I specified repartition(300) and each of the executors were specified 2 cores when I submitted the job. This shows repartition works to increase more executors. However, the running time was still around 50 seconds although I only did a simple groupby operation. I think repartition may consume part of the running time. Considering the input source of Kafka, is there a way to make the program even faster? Thanks! On Mon, Jul 14, 2014 at 3:22 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you give me a screen shot of the stages page in the web ui, the spark logs, and the code that is causing this behavior. This seems quite weird to me. TD On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, It seems repartition does not necessarily force Spark to distribute the data into different executors. I have launched a new job which uses repartition right after I received data from Kafka. For the first two batches, the reduce stage used more than 80 executors. Starting from the third batch, there were always only 2 executors in the reduce task (combineByKey). Even with the first batch which used more than 80 executors, it took 2.4 mins to finish the reduce stage for a very small amount of data. Bill On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das tathagata.das1...@gmail.com wrote: After using repartition(300), how many executors did it run on? By the way, repartitions(300) means it will divide the shuffled data into 300 partitions. Since there are many cores on each of the 300 machines/executors, these partitions (each requiring a core) may not be spread all 300 executors. Hence, if you really want spread it all 300 executors, you may have to bump up the partitions even more. However, increasing the partitions to too high may not be beneficial, and you will have play around with the number to figure out sweet spot that reduces the time to process the stage / time to process the whole batch. TD On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Do you mean that the data is not shuffled until the reduce stage? That means groupBy still only uses 2 machines? I think I used repartition(300) after I read the data from Kafka into DStream. It seems that it did not guarantee that the map or reduce stages will be run on 300 machines. I am currently trying to initiate 100 DStream from KafkaUtils.createDStream and union them. Now the reduce stages had around 80 machines for all the batches. However, this method will introduce many dstreams. It will be good if we can control the number of executors in the groupBy operation because the calculation needs to be finished within 1 minute for different size of input data based on our production need. Thanks! Bill On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Aah, I get it now. That is because the input data streams is replicated on two machines, so by locality the data is processed on those two machines. So the map stage on the data uses 2 executors, but the reduce stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default parallelism takes into affect only when the data is explicitly shuffled around. You can fix this by explicitly repartitioning the data. inputDStream.repartition(partitions) This is covered in the streaming tuning guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving . TD On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi folks, I just ran another job that only received data from Kafka, did some filtering, and then save as text files in HDFS. There was no reducing work involved. Surprisingly, the number of executors for the saveAsTextFiles stage was also 2 although I specified 300 executors in the job submission. As a result, the simple save file action took more than 2 minutes. Do you have any idea how Spark determined the number of executors for different stages? Thanks! Bill On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2
Spark Streaming timestamps
Hi all, I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. I have several questions on Spark Streaming's timestamp: 1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated? 2) If I do not use saveAsTextFiles, how do I get the exact time interval of the RDD when I use foreachRDD to do custom output of the results? 3) How can we specify the starting time of the batches? Thanks! Bill
Re: Number of executors change during job running
Hi Tathagata, It seems repartition does not necessarily force Spark to distribute the data into different executors. I have launched a new job which uses repartition right after I received data from Kafka. For the first two batches, the reduce stage used more than 80 executors. Starting from the third batch, there were always only 2 executors in the reduce task (combineByKey). Even with the first batch which used more than 80 executors, it took 2.4 mins to finish the reduce stage for a very small amount of data. Bill On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das tathagata.das1...@gmail.com wrote: After using repartition(300), how many executors did it run on? By the way, repartitions(300) means it will divide the shuffled data into 300 partitions. Since there are many cores on each of the 300 machines/executors, these partitions (each requiring a core) may not be spread all 300 executors. Hence, if you really want spread it all 300 executors, you may have to bump up the partitions even more. However, increasing the partitions to too high may not be beneficial, and you will have play around with the number to figure out sweet spot that reduces the time to process the stage / time to process the whole batch. TD On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Do you mean that the data is not shuffled until the reduce stage? That means groupBy still only uses 2 machines? I think I used repartition(300) after I read the data from Kafka into DStream. It seems that it did not guarantee that the map or reduce stages will be run on 300 machines. I am currently trying to initiate 100 DStream from KafkaUtils.createDStream and union them. Now the reduce stages had around 80 machines for all the batches. However, this method will introduce many dstreams. It will be good if we can control the number of executors in the groupBy operation because the calculation needs to be finished within 1 minute for different size of input data based on our production need. Thanks! Bill On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Aah, I get it now. That is because the input data streams is replicated on two machines, so by locality the data is processed on those two machines. So the map stage on the data uses 2 executors, but the reduce stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default parallelism takes into affect only when the data is explicitly shuffled around. You can fix this by explicitly repartitioning the data. inputDStream.repartition(partitions) This is covered in the streaming tuning guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving . TD On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi folks, I just ran another job that only received data from Kafka, did some filtering, and then save as text files in HDFS. There was no reducing work involved. Surprisingly, the number of executors for the saveAsTextFiles stage was also 2 although I specified 300 executors in the job submission. As a result, the simple save file action took more than 2 minutes. Do you have any idea how Spark determined the number of executors for different stages? Thanks! Bill On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2,topic3,topic4 val numThreads = 4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = lines1 lines.cache() val jsonData = lines.map(JSON.parseFull(_)) val mapData = jsonData.filter(_.isDefined) .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) val validMapData = mapData.filter(isValidData(_)) val fields = validMapData.map(data = (data(id).toString, timestampToUTCUnix(data(time).toString), timestampToUTCUnix(data(local_time).toString), data(id2).toString, data(id3).toString, data(log_type).toString, data(sub_log_type).toString)) val timeDiff = 3600L val filteredFields = fields.filter(field = abs(field._2 - field._3) = timeDiff) val
Re: Number of executors change during job running
Hi Praveen, I did not change the number of total executors. I specified 300 as the number of executors when I submitted the jobs. However, for some stages, the number of executors is very small, leading to long calculation time even for small data set. That means not all executors were used for some stages. If I went to the detail of the running time of different executors, I found some of them had very low running time while very few had very long running time, leading to long overall running time. Another point I noticed is that the number of completed tasks are usually larger than the number of total tasks. That means sometimes the job is still running in some stages although all the tasks have been finished. These are the too behavior I observed that may related to the wrong running time. Bill On Thu, Jul 10, 2014 at 11:26 PM, Praveen Seluka psel...@qubole.com wrote: If I understand correctly, you could not change the number of executors at runtime right(correct me if am wrong) - its defined when we start the application and fixed. Do you mean number of tasks? On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set, then the number of reducers used in the stages can keep changing across batches. TD On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2 executors allocated for this stage. However, during other batches, the executors can be several hundred for the same stage, which means the number of executors for the same operations change. Does anyone know how Spark allocate the number of executors for different stages and how to increase the efficiency for task? Thanks! Bill
Re: Join two Spark Streaming
Hi Tathagata, Thanks for the solution. Actually, I will use the number of unique integers in the batch instead of accumulative number of unique integers. I do have two questions about your code: 1. Why do we need uniqueValuesRDD? Why do we need to call uniqueValuesRDD.checkpoint()? 2. Where is distinctValues defined? Bill On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you want to continuously maintain the set of unique integers seen since the beginning of stream? var uniqueValuesRDD: RDD[Int] = ... dstreamOfIntegers.transform(newDataRDD = { val newUniqueValuesRDD = newDataRDD.union(distinctValues).distinct uniqueValuesRDD = newUniqueValuesRDD // periodically call uniqueValuesRDD.checkpoint() val uniqueCount = uniqueValuesRDD.count() newDataRDD.map(x = x / count) }) On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am working on a pipeline that needs to join two Spark streams. The input is a stream of integers. And the output is the number of integer's appearance divided by the total number of unique integers. Suppose the input is: 1 2 3 1 2 2 There are 3 unique integers and 1 appears twice. Therefore, the output for the integer 1 will be: 1 0.67 Since the input is from a stream, it seems we need to first join the appearance of the integers and the total number of unique integers and then do a calculation using map. I am thinking of adding a dummy key to both streams and use join. However, a Cartesian product matches the application here better. How to do this effectively? Thanks! Bill
Re: Spark Streaming with Kafka NoClassDefFoundError
I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Number of executors change during job running
Hi Tathagata, I also tried to use the number of partitions as parameters to the functions such as groupByKey. It seems the numbers of executors is around 50 instead of 300, which is the number of the executors I specified in submission script. Moreover, the running time of different executors is skewed. The ideal case is that Spark can distribute the data into 300 executors evenly so that the computation can be efficiently finished. I am not sure how to achieve this. Thanks! Bill On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set, then the number of reducers used in the stages can keep changing across batches. TD On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2 executors allocated for this stage. However, during other batches, the executors can be several hundred for the same stage, which means the number of executors for the same operations change. Does anyone know how Spark allocate the number of executors for different stages and how to increase the efficiency for task? Thanks! Bill
Re: Spark Streaming with Kafka NoClassDefFoundError
You may try to use this one: https://github.com/sbt/sbt-assembly I had an issue of duplicate files in the uber jar file. But I think this library will assemble dependencies into a single jar file. Bill On Fri, Jul 11, 2014 at 1:34 AM, Dilip dilip_ram...@hotmail.com wrote: A simple sbt assembly is not working. Is there any other way to include particular jars with assembly command? Regards, Dilip On Friday 11 July 2014 12:45 PM, Bill Jay wrote: I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Number of executors change during job running
Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2,topic3,topic4 val numThreads = 4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = lines1 lines.cache() val jsonData = lines.map(JSON.parseFull(_)) val mapData = jsonData.filter(_.isDefined) .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) val validMapData = mapData.filter(isValidData(_)) val fields = validMapData.map(data = (data(id).toString, timestampToUTCUnix(data(time).toString), timestampToUTCUnix(data(local_time).toString), data(id2).toString, data(id3).toString, data(log_type).toString, data(sub_log_type).toString)) val timeDiff = 3600L val filteredFields = fields.filter(field = abs(field._2 - field._3) = timeDiff) val watchTimeFields = filteredFields.map(fields = (fields._1, fields._2, fields._4, fields._5, fields._7)) val watchTimeTuples = watchTimeFields.map(fields = getWatchtimeTuple(fields)) val programDuids = watchTimeTuples.map(fields = (fields._3, fields._1)).groupByKey(partition) val programDuidNum = programDuids.map{case(key, value) = (key, value.toSet.size)} programDuidNum.saveAsTextFiles(hadoopOutput+result) I have been working on this for several days. No findings why there are always 2 executors for the groupBy stage. Thanks a lot! Bill On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you show us the program that you are running. If you are setting number of partitions in the XYZ-ByKey operation as 300, then there should be 300 tasks for that stage, distributed on the 50 executors are allocated to your context. However the data distribution may be skewed in which case, you can use a repartition operation to redistributed the data more evenly (both DStream and RDD have repartition). TD On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I also tried to use the number of partitions as parameters to the functions such as groupByKey. It seems the numbers of executors is around 50 instead of 300, which is the number of the executors I specified in submission script. Moreover, the running time of different executors is skewed. The ideal case is that Spark can distribute the data into 300 executors evenly so that the computation can be efficiently finished. I am not sure how to achieve this. Thanks! Bill On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set, then the number of reducers used in the stages can keep changing across batches. TD On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage
Re: Number of executors change during job running
Hi folks, I just ran another job that only received data from Kafka, did some filtering, and then save as text files in HDFS. There was no reducing work involved. Surprisingly, the number of executors for the saveAsTextFiles stage was also 2 although I specified 300 executors in the job submission. As a result, the simple save file action took more than 2 minutes. Do you have any idea how Spark determined the number of executors for different stages? Thanks! Bill On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2,topic3,topic4 val numThreads = 4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = lines1 lines.cache() val jsonData = lines.map(JSON.parseFull(_)) val mapData = jsonData.filter(_.isDefined) .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) val validMapData = mapData.filter(isValidData(_)) val fields = validMapData.map(data = (data(id).toString, timestampToUTCUnix(data(time).toString), timestampToUTCUnix(data(local_time).toString), data(id2).toString, data(id3).toString, data(log_type).toString, data(sub_log_type).toString)) val timeDiff = 3600L val filteredFields = fields.filter(field = abs(field._2 - field._3) = timeDiff) val watchTimeFields = filteredFields.map(fields = (fields._1, fields._2, fields._4, fields._5, fields._7)) val watchTimeTuples = watchTimeFields.map(fields = getWatchtimeTuple(fields)) val programDuids = watchTimeTuples.map(fields = (fields._3, fields._1)).groupByKey(partition) val programDuidNum = programDuids.map{case(key, value) = (key, value.toSet.size)} programDuidNum.saveAsTextFiles(hadoopOutput+result) I have been working on this for several days. No findings why there are always 2 executors for the groupBy stage. Thanks a lot! Bill On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you show us the program that you are running. If you are setting number of partitions in the XYZ-ByKey operation as 300, then there should be 300 tasks for that stage, distributed on the 50 executors are allocated to your context. However the data distribution may be skewed in which case, you can use a repartition operation to redistributed the data more evenly (both DStream and RDD have repartition). TD On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I also tried to use the number of partitions as parameters to the functions such as groupByKey. It seems the numbers of executors is around 50 instead of 300, which is the number of the executors I specified in submission script. Moreover, the running time of different executors is skewed. The ideal case is that Spark can distribute the data into 300 executors evenly so that the computation can be efficiently finished. I am not sure how to achieve this. Thanks! Bill On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set
Re: Number of executors change during job running
Hi Tathagata, Do you mean that the data is not shuffled until the reduce stage? That means groupBy still only uses 2 machines? I think I used repartition(300) after I read the data from Kafka into DStream. It seems that it did not guarantee that the map or reduce stages will be run on 300 machines. I am currently trying to initiate 100 DStream from KafkaUtils.createDStream and union them. Now the reduce stages had around 80 machines for all the batches. However, this method will introduce many dstreams. It will be good if we can control the number of executors in the groupBy operation because the calculation needs to be finished within 1 minute for different size of input data based on our production need. Thanks! Bill On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Aah, I get it now. That is because the input data streams is replicated on two machines, so by locality the data is processed on those two machines. So the map stage on the data uses 2 executors, but the reduce stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default parallelism takes into affect only when the data is explicitly shuffled around. You can fix this by explicitly repartitioning the data. inputDStream.repartition(partitions) This is covered in the streaming tuning guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving . TD On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi folks, I just ran another job that only received data from Kafka, did some filtering, and then save as text files in HDFS. There was no reducing work involved. Surprisingly, the number of executors for the saveAsTextFiles stage was also 2 although I specified 300 executors in the job submission. As a result, the simple save file action took more than 2 minutes. Do you have any idea how Spark determined the number of executors for different stages? Thanks! Bill On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2,topic3,topic4 val numThreads = 4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = lines1 lines.cache() val jsonData = lines.map(JSON.parseFull(_)) val mapData = jsonData.filter(_.isDefined) .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) val validMapData = mapData.filter(isValidData(_)) val fields = validMapData.map(data = (data(id).toString, timestampToUTCUnix(data(time).toString), timestampToUTCUnix(data(local_time).toString), data(id2).toString, data(id3).toString, data(log_type).toString, data(sub_log_type).toString)) val timeDiff = 3600L val filteredFields = fields.filter(field = abs(field._2 - field._3) = timeDiff) val watchTimeFields = filteredFields.map(fields = (fields._1, fields._2, fields._4, fields._5, fields._7)) val watchTimeTuples = watchTimeFields.map(fields = getWatchtimeTuple(fields)) val programDuids = watchTimeTuples.map(fields = (fields._3, fields._1)).groupByKey(partition) val programDuidNum = programDuids.map{case(key, value) = (key, value.toSet.size)} programDuidNum.saveAsTextFiles(hadoopOutput+result) I have been working on this for several days. No findings why there are always 2 executors for the groupBy stage. Thanks a lot! Bill On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you show us the program that you are running. If you are setting number of partitions in the XYZ-ByKey operation as 300, then there should be 300 tasks for that stage, distributed on the 50 executors are allocated to your context. However the data distribution may be skewed in which case, you can use a repartition operation to redistributed the data more evenly (both DStream and RDD have repartition). TD On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I also tried to use the number of partitions as parameters to the functions such as groupByKey. It seems the numbers of executors is around 50 instead of 300, which
Re: Use Spark Streaming to update result whenever data come
Tobias, Your help on the problems I have met have been very helpful. Thanks a lot! Bill On Wed, Jul 9, 2014 at 6:04 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, good to know you found your bottleneck. Unfortunately, I don't know how to solve this; until know, I have used Spark only with embarassingly parallel operations such as map or filter. I hope someone else might provide more insight here. Tobias On Thu, Jul 10, 2014 at 9:57 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, Now I did the re-partition and ran the program again. I find a bottleneck of the whole program. In the streaming, there is a stage marked as *combineByKey at ShuffledDStream.scala:42 *in spark UI. This stage is repeatedly executed. However, during some batches, the number of executors allocated to this step is only 2 although I used 300 workers and specified the partition number as 300. In this case, the program is very slow although the data that are processed are not big. Do you know how to solve this issue? Thanks! On Wed, Jul 9, 2014 at 5:51 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, I haven't worked with Yarn, but I would try adding a repartition() call after you receive your data from Kafka. I would be surprised if that didn't help. On Thu, Jul 10, 2014 at 6:23 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, I was using Spark 0.9 before and the master I used was yarn-standalone. In Spark 1.0, the master will be either yarn-cluster or yarn-client. I am not sure whether it is the reason why more machines do not provide better scalability. What is the difference between these two modes in terms of efficiency? Thanks! On Tue, Jul 8, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, do the additional 100 nodes receive any tasks at all? (I don't know which cluster you use, but with Mesos you could check client logs in the web interface.) You might want to try something like repartition(N) or repartition(N*2) (with N the number of your nodes) after you receive your data. Tobias On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, Thanks for the suggestion. I have tried to add more nodes from 300 to 400. It seems the running time did not get improved. On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, can't you just add more nodes in order to speed up the processing? Tobias On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a problem of using Spark Streaming to accept input data and update a result. The input of the data is from Kafka and the output is to report a map which is updated by historical data in every minute. My current method is to set batch size as 1 minute and use foreachRDD to update this map and output the map at the end of the foreachRDD function. However, the current issue is the processing cannot be finished within one minute. I am thinking of updating the map whenever the new data come instead of doing the update when the whoe RDD comes. Is there any idea on how to achieve this in a better running time? Thanks! Bill
Re: Number of executors change during job running
Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set, then the number of reducers used in the stages can keep changing across batches. TD On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2 executors allocated for this stage. However, during other batches, the executors can be several hundred for the same stage, which means the number of executors for the same operations change. Does anyone know how Spark allocate the number of executors for different stages and how to increase the efficiency for task? Thanks! Bill
Re: Use Spark Streaming to update result whenever data come
Hi Tobias, I was using Spark 0.9 before and the master I used was yarn-standalone. In Spark 1.0, the master will be either yarn-cluster or yarn-client. I am not sure whether it is the reason why more machines do not provide better scalability. What is the difference between these two modes in terms of efficiency? Thanks! On Tue, Jul 8, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, do the additional 100 nodes receive any tasks at all? (I don't know which cluster you use, but with Mesos you could check client logs in the web interface.) You might want to try something like repartition(N) or repartition(N*2) (with N the number of your nodes) after you receive your data. Tobias On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, Thanks for the suggestion. I have tried to add more nodes from 300 to 400. It seems the running time did not get improved. On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, can't you just add more nodes in order to speed up the processing? Tobias On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a problem of using Spark Streaming to accept input data and update a result. The input of the data is from Kafka and the output is to report a map which is updated by historical data in every minute. My current method is to set batch size as 1 minute and use foreachRDD to update this map and output the map at the end of the foreachRDD function. However, the current issue is the processing cannot be finished within one minute. I am thinking of updating the map whenever the new data come instead of doing the update when the whoe RDD comes. Is there any idea on how to achieve this in a better running time? Thanks! Bill
Number of executors change during job running
Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2 executors allocated for this stage. However, during other batches, the executors can be several hundred for the same stage, which means the number of executors for the same operations change. Does anyone know how Spark allocate the number of executors for different stages and how to increase the efficiency for task? Thanks! Bill
Re: Use Spark Streaming to update result whenever data come
Hi Tobias, Now I did the re-partition and ran the program again. I find a bottleneck of the whole program. In the streaming, there is a stage marked as *combineByKey at ShuffledDStream.scala:42 *in spark UI. This stage is repeatedly executed. However, during some batches, the number of executors allocated to this step is only 2 although I used 300 workers and specified the partition number as 300. In this case, the program is very slow although the data that are processed are not big. Do you know how to solve this issue? Thanks! On Wed, Jul 9, 2014 at 5:51 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, I haven't worked with Yarn, but I would try adding a repartition() call after you receive your data from Kafka. I would be surprised if that didn't help. On Thu, Jul 10, 2014 at 6:23 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, I was using Spark 0.9 before and the master I used was yarn-standalone. In Spark 1.0, the master will be either yarn-cluster or yarn-client. I am not sure whether it is the reason why more machines do not provide better scalability. What is the difference between these two modes in terms of efficiency? Thanks! On Tue, Jul 8, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, do the additional 100 nodes receive any tasks at all? (I don't know which cluster you use, but with Mesos you could check client logs in the web interface.) You might want to try something like repartition(N) or repartition(N*2) (with N the number of your nodes) after you receive your data. Tobias On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, Thanks for the suggestion. I have tried to add more nodes from 300 to 400. It seems the running time did not get improved. On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, can't you just add more nodes in order to speed up the processing? Tobias On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a problem of using Spark Streaming to accept input data and update a result. The input of the data is from Kafka and the output is to report a map which is updated by historical data in every minute. My current method is to set batch size as 1 minute and use foreachRDD to update this map and output the map at the end of the foreachRDD function. However, the current issue is the processing cannot be finished within one minute. I am thinking of updating the map whenever the new data come instead of doing the update when the whoe RDD comes. Is there any idea on how to achieve this in a better running time? Thanks! Bill
Join two Spark Streaming
Hi all, I am working on a pipeline that needs to join two Spark streams. The input is a stream of integers. And the output is the number of integer's appearance divided by the total number of unique integers. Suppose the input is: 1 2 3 1 2 2 There are 3 unique integers and 1 appears twice. Therefore, the output for the integer 1 will be: 1 0.67 Since the input is from a stream, it seems we need to first join the appearance of the integers and the total number of unique integers and then do a calculation using map. I am thinking of adding a dummy key to both streams and use join. However, a Cartesian product matches the application here better. How to do this effectively? Thanks! Bill
Re: Use Spark Streaming to update result whenever data come
Hi Tobias, Thanks for the suggestion. I have tried to add more nodes from 300 to 400. It seems the running time did not get improved. On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, can't you just add more nodes in order to speed up the processing? Tobias On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a problem of using Spark Streaming to accept input data and update a result. The input of the data is from Kafka and the output is to report a map which is updated by historical data in every minute. My current method is to set batch size as 1 minute and use foreachRDD to update this map and output the map at the end of the foreachRDD function. However, the current issue is the processing cannot be finished within one minute. I am thinking of updating the map whenever the new data come instead of doing the update when the whoe RDD comes. Is there any idea on how to achieve this in a better running time? Thanks! Bill
Spark-streaming-kafka error
Hi all, I used sbt to package a code that uses spark-streaming-kafka. The packaging succeeded. However, when I submitted to yarn, the job ran for 10 seconds and there was an error in the log file as follows: Caused by: java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ Does anyone know the reason for this? Thanks! Bill
Re: Spark-streaming-kafka error
Hi Tobias, Currently, I do not use bundle any dependency into my application jar. I will try that. Thanks a lot! Bill On Tue, Jul 8, 2014 at 5:22 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, have you packaged org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 into your application jar? If I remember correctly, it's not bundled with the downloadable compiled version of Spark. Tobias On Wed, Jul 9, 2014 at 8:18 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I used sbt to package a code that uses spark-streaming-kafka. The packaging succeeded. However, when I submitted to yarn, the job ran for 10 seconds and there was an error in the log file as follows: Caused by: java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ Does anyone know the reason for this? Thanks! Bill
Use Spark Streaming to update result whenever data come
Hi all, I have a problem of using Spark Streaming to accept input data and update a result. The input of the data is from Kafka and the output is to report a map which is updated by historical data in every minute. My current method is to set batch size as 1 minute and use foreachRDD to update this map and output the map at the end of the foreachRDD function. However, the current issue is the processing cannot be finished within one minute. I am thinking of updating the map whenever the new data come instead of doing the update when the whoe RDD comes. Is there any idea on how to achieve this in a better running time? Thanks! Bill
Re: Could not compute split, block not found
Hi Tobias, Your explanation makes a lot of sense. Actually, I tried to use partial data on the same program yesterday. It has been up for around 24 hours and is still running correctly. Thanks! Bill On Mon, Jun 30, 2014 at 5:53 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, let's say the processing time is t' and the window size t. Spark does not *require* t' t. In fact, for *temporary* peaks in your streaming data, I think the way Spark handles it is very nice, in particular since 1) it does not mix up the order in which items arrived in the stream, so items from a later window will always be processed later, and 2) because an increase in data will not be punished with high load and unresponsive systems, but with disk space consumption instead. However, if all of your windows require t' t processing time (and it's not because you are waiting, but because you actually do some computation), then you are in bad luck, because if you start processing the next window while the previous one is still processed, you have less resources for each and processing will take even longer. However, if you are only waiting (e.g., for network I/O), then maybe you can employ some asynchronous solution where your tasks return immediately and deliver their result via a callback later? Tobias On Tue, Jul 1, 2014 at 2:26 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Tobias, Your suggestion is very helpful. I will definitely investigate it. Just curious. Suppose the batch size is t seconds. In practice, does Spark always require the program to finish processing the data of t seconds within t seconds' processing time? Can Spark begin to consume the new batch before finishing processing the next batch? If Spark can do them together, it may save the processing time and solve the problem of data piling up. Thanks! Bill On Mon, Jun 30, 2014 at 4:49 AM, Tobias Pfeiffer t...@preferred.jp wrote: If your batch size is one minute and it takes more than one minute to process, then I guess that's what causes your problem. The processing of the second batch will not start after the processing of the first is finished, which leads to more and more data being stored and waiting for processing; check the attached graph for a visualization of what I think may happen. Can you maybe do something hacky like throwing away a part of the data so that processing time gets below one minute, then check whether you still get that error? Tobias On Mon, Jun 30, 2014 at 1:56 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Tobias, Thanks for your help. I think in my case, the batch size is 1 minute. However, it takes my program more than 1 minute to process 1 minute's data. I am not sure whether it is because the unprocessed data pile up. Do you have an suggestion on how to check it and solve it? Thanks! Bill On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, were you able to process all information in time, or did maybe some unprocessed data pile up? I think when I saw this once, the reason seemed to be that I had received more data than would fit in memory, while waiting for processing, so old data was deleted. When it was time to process that data, it didn't exist any more. Is that a possible reason in your case? Tobias On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi, I am running a spark streaming job with 1 minute as the batch size. It ran around 84 minutes and was killed because of the exception with the following information: java.lang.Exception: Could not compute split, block input-0-1403893740400 not found Before it was killed, it was able to correctly generate output for each batch. Any help on this will be greatly appreciated. Bill
Re: Could not compute split, block not found
Hi Tathagata, Yes. The input stream is from Kafka and my program reads the data, keeps all the data in memory, process the data, and generate the output. Bill On Mon, Jun 30, 2014 at 11:45 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you by any change using only memory in the storage level of the input streams? TD On Mon, Jun 30, 2014 at 5:53 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, let's say the processing time is t' and the window size t. Spark does not *require* t' t. In fact, for *temporary* peaks in your streaming data, I think the way Spark handles it is very nice, in particular since 1) it does not mix up the order in which items arrived in the stream, so items from a later window will always be processed later, and 2) because an increase in data will not be punished with high load and unresponsive systems, but with disk space consumption instead. However, if all of your windows require t' t processing time (and it's not because you are waiting, but because you actually do some computation), then you are in bad luck, because if you start processing the next window while the previous one is still processed, you have less resources for each and processing will take even longer. However, if you are only waiting (e.g., for network I/O), then maybe you can employ some asynchronous solution where your tasks return immediately and deliver their result via a callback later? Tobias On Tue, Jul 1, 2014 at 2:26 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Tobias, Your suggestion is very helpful. I will definitely investigate it. Just curious. Suppose the batch size is t seconds. In practice, does Spark always require the program to finish processing the data of t seconds within t seconds' processing time? Can Spark begin to consume the new batch before finishing processing the next batch? If Spark can do them together, it may save the processing time and solve the problem of data piling up. Thanks! Bill On Mon, Jun 30, 2014 at 4:49 AM, Tobias Pfeiffer t...@preferred.jp wrote: If your batch size is one minute and it takes more than one minute to process, then I guess that's what causes your problem. The processing of the second batch will not start after the processing of the first is finished, which leads to more and more data being stored and waiting for processing; check the attached graph for a visualization of what I think may happen. Can you maybe do something hacky like throwing away a part of the data so that processing time gets below one minute, then check whether you still get that error? Tobias On Mon, Jun 30, 2014 at 1:56 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Tobias, Thanks for your help. I think in my case, the batch size is 1 minute. However, it takes my program more than 1 minute to process 1 minute's data. I am not sure whether it is because the unprocessed data pile up. Do you have an suggestion on how to check it and solve it? Thanks! Bill On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, were you able to process all information in time, or did maybe some unprocessed data pile up? I think when I saw this once, the reason seemed to be that I had received more data than would fit in memory, while waiting for processing, so old data was deleted. When it was time to process that data, it didn't exist any more. Is that a possible reason in your case? Tobias On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi, I am running a spark streaming job with 1 minute as the batch size. It ran around 84 minutes and was killed because of the exception with the following information: java.lang.Exception: Could not compute split, block input-0-1403893740400 not found Before it was killed, it was able to correctly generate output for each batch. Any help on this will be greatly appreciated. Bill
slf4j multiple bindings
Hi all, I have an issue with multiple slf4j bindings. My program was running correctly. I just added the new dependency kryo. And when I submitted a job, the job was killed because of the following error messages: *SLF4J: Class path contains multiple SLF4J bindings.* The log said there were three slf4j bindings: spark-assembly-0.9.1-hadoop2.3.0.jar, hadoop lib, and my own jar file. However, I did not explicitly add slf4j in my pom.xml file. I added exclusions in the dependency of kryo but it did not work. Does anyone has an idea how to fix this issue? Thanks! Regards, Bill
Re: Could not compute split, block not found
Tobias, Your suggestion is very helpful. I will definitely investigate it. Just curious. Suppose the batch size is t seconds. In practice, does Spark always require the program to finish processing the data of t seconds within t seconds' processing time? Can Spark begin to consume the new batch before finishing processing the next batch? If Spark can do them together, it may save the processing time and solve the problem of data piling up. Thanks! Bill On Mon, Jun 30, 2014 at 4:49 AM, Tobias Pfeiffer t...@preferred.jp wrote: If your batch size is one minute and it takes more than one minute to process, then I guess that's what causes your problem. The processing of the second batch will not start after the processing of the first is finished, which leads to more and more data being stored and waiting for processing; check the attached graph for a visualization of what I think may happen. Can you maybe do something hacky like throwing away a part of the data so that processing time gets below one minute, then check whether you still get that error? Tobias On Mon, Jun 30, 2014 at 1:56 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Tobias, Thanks for your help. I think in my case, the batch size is 1 minute. However, it takes my program more than 1 minute to process 1 minute's data. I am not sure whether it is because the unprocessed data pile up. Do you have an suggestion on how to check it and solve it? Thanks! Bill On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, were you able to process all information in time, or did maybe some unprocessed data pile up? I think when I saw this once, the reason seemed to be that I had received more data than would fit in memory, while waiting for processing, so old data was deleted. When it was time to process that data, it didn't exist any more. Is that a possible reason in your case? Tobias On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi, I am running a spark streaming job with 1 minute as the batch size. It ran around 84 minutes and was killed because of the exception with the following information: java.lang.Exception: Could not compute split, block input-0-1403893740400 not found Before it was killed, it was able to correctly generate output for each batch. Any help on this will be greatly appreciated. Bill
Re: Could not compute split, block not found
Tobias, Thanks for your help. I think in my case, the batch size is 1 minute. However, it takes my program more than 1 minute to process 1 minute's data. I am not sure whether it is because the unprocessed data pile up. Do you have an suggestion on how to check it and solve it? Thanks! Bill On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, were you able to process all information in time, or did maybe some unprocessed data pile up? I think when I saw this once, the reason seemed to be that I had received more data than would fit in memory, while waiting for processing, so old data was deleted. When it was time to process that data, it didn't exist any more. Is that a possible reason in your case? Tobias On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi, I am running a spark streaming job with 1 minute as the batch size. It ran around 84 minutes and was killed because of the exception with the following information: java.lang.Exception: Could not compute split, block input-0-1403893740400 not found Before it was killed, it was able to correctly generate output for each batch. Any help on this will be greatly appreciated. Bill
Could not compute split, block not found
Hi, I am running a spark streaming job with 1 minute as the batch size. It ran around 84 minutes and was killed because of the exception with the following information: *java.lang.Exception: Could not compute split, block input-0-1403893740400 not found* Before it was killed, it was able to correctly generate output for each batch. Any help on this will be greatly appreciated. Bill
Re: Spark Streaming RDD transformation
Thanks, Sean! I am currently using foreachRDD to update the global map using data in each RDD. The reason I want to return a map as RDD instead of just updating the map is that RDD provides many handy methods for output. For example, I want to save the global map into files in HDFS for each batch in the stream. In this case, do you have any suggestion how Spark can easily allow me to do that? Thanks! On Thu, Jun 26, 2014 at 12:26 PM, Sean Owen so...@cloudera.com wrote: If you want to transform an RDD to a Map, I assume you have an RDD of pairs. The method collectAsMap() creates a Map from the RDD in this case. Do you mean that you want to update a Map object using data in each RDD? You would use foreachRDD() in that case. Then you can use RDD.foreach to do something like update a global Map object. Not sure if this is what you mean but SparkContext.parallelize() can be used to make an RDD from a List or Array of objects. But that's not really related to streaming or updating a Map. On Thu, Jun 26, 2014 at 1:40 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am current working on a project that requires to transform each RDD in a DStream to a Map. Basically, when we get a list of data in each batch, we would like to update the global map. I would like to return the map as a single RDD. I am currently trying to use the function transform. The output will be a RDD of the updated map after each batch. How can I create an RDD from another data structure such as Int, Map, ect. Thanks! Bill