Re: Spark Streaming + Kafka failure recovery

2015-05-21 Thread Bill Jay
Hi Cody,

That is clear. Thanks!

Bill

On Tue, May 19, 2015 at 1:27 PM, Cody Koeninger c...@koeninger.org wrote:

 If you checkpoint, the job will start from the successfully consumed
 offsets.  If you don't checkpoint, by default it will start from the
 highest available offset, and you will potentially lose data.

 Is the link I posted, or for that matter the scaladoc, really not clear on
 that point?

 The scaladoc says:

  To recover from driver failures, you have to enable checkpointing in the
 StreamingContext
 http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/StreamingContext.html.
 The information on consumed offset can be recovered from the checkpoint.

 On Tue, May 19, 2015 at 2:38 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 If a Spark streaming job stops at 12:01 and I resume the job at 12:02.
 Will it still start to consume the data that were produced to Kafka at
 12:01? Or it will just start consuming from the current time?


 On Tue, May 19, 2015 at 10:58 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Have you read
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
 ?

 1.  There's nothing preventing that.

 2. Checkpointing will give you at-least-once semantics, provided you
 have sufficient kafka retention.  Be aware that checkpoints aren't
 recoverable if you upgrade code.

 On Tue, May 19, 2015 at 12:42 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am currently using Spark streaming to consume and save logs every
 hour in our production pipeline. The current setting is to run a crontab
 job to check every minute whether the job is still there and if not
 resubmit a Spark streaming job. I am currently using the direct approach
 for Kafka consumer. I have two questions:

 1. In the direct approach, no offset is stored in zookeeper and no
 group id is specified. Can two consumers (one is Spark streaming and the
 other is a Kafak console consumer in Kafka package) read from the same
 topic from the brokers together (I would like both of them to get all
 messages, i.e. publish-subscribe mode)? What about two Spark streaming jobs
 read from the same topic?

 2. How to avoid data loss if a Spark job is killed? Does checkpointing
 serve this purpose? The default behavior of Spark streaming is to read the
 latest logs. However, if a job is killed, can the new job resume from what
 was left to avoid loosing logs?

 Thanks!

 Bill







Spark Streaming + Kafka failure recovery

2015-05-19 Thread Bill Jay
Hi all,

I am currently using Spark streaming to consume and save logs every hour in
our production pipeline. The current setting is to run a crontab job to
check every minute whether the job is still there and if not resubmit a
Spark streaming job. I am currently using the direct approach for Kafka
consumer. I have two questions:

1. In the direct approach, no offset is stored in zookeeper and no group id
is specified. Can two consumers (one is Spark streaming and the other is a
Kafak console consumer in Kafka package) read from the same topic from the
brokers together (I would like both of them to get all messages, i.e.
publish-subscribe mode)? What about two Spark streaming jobs read from the
same topic?

2. How to avoid data loss if a Spark job is killed? Does checkpointing
serve this purpose? The default behavior of Spark streaming is to read the
latest logs. However, if a job is killed, can the new job resume from what
was left to avoid loosing logs?

Thanks!

Bill


Re: Spark Streaming + Kafka failure recovery

2015-05-19 Thread Bill Jay
If a Spark streaming job stops at 12:01 and I resume the job at 12:02. Will
it still start to consume the data that were produced to Kafka at 12:01? Or
it will just start consuming from the current time?


On Tue, May 19, 2015 at 10:58 AM, Cody Koeninger c...@koeninger.org wrote:

 Have you read
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ?

 1.  There's nothing preventing that.

 2. Checkpointing will give you at-least-once semantics, provided you have
 sufficient kafka retention.  Be aware that checkpoints aren't recoverable
 if you upgrade code.

 On Tue, May 19, 2015 at 12:42 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am currently using Spark streaming to consume and save logs every hour
 in our production pipeline. The current setting is to run a crontab job to
 check every minute whether the job is still there and if not resubmit a
 Spark streaming job. I am currently using the direct approach for Kafka
 consumer. I have two questions:

 1. In the direct approach, no offset is stored in zookeeper and no group
 id is specified. Can two consumers (one is Spark streaming and the other is
 a Kafak console consumer in Kafka package) read from the same topic from
 the brokers together (I would like both of them to get all messages, i.e.
 publish-subscribe mode)? What about two Spark streaming jobs read from the
 same topic?

 2. How to avoid data loss if a Spark job is killed? Does checkpointing
 serve this purpose? The default behavior of Spark streaming is to read the
 latest logs. However, if a job is killed, can the new job resume from what
 was left to avoid loosing logs?

 Thanks!

 Bill





Partition number of Spark Streaming Kafka receiver-based approach

2015-05-18 Thread Bill Jay
Hi all,

I am reading the docs of receiver-based Kafka consumer. The last parameters
of KafkaUtils.createStream is per topic number of Kafka partitions to
consume. My question is, does the number of partitions for topic in this
parameter need to match the number of partitions in Kafka.

For example, I have two topics, topic1 with 3 partitions and topic2 with 4
partitions.

If i specify 2 for topic1 and 3 for topic2 and feed them to the
createStream function, will there be data loss? Or it will just be an
inefficiency.

Thanks!

Bill


Re: Too many open files when using Spark to consume messages from Kafka

2015-04-30 Thread Bill Jay
The data ingestion is in outermost portion in foreachRDD block. Although
now I close the statement of jdbc, the same exception happened again. It
seems it is not related to the data ingestion part.

On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger c...@koeninger.org wrote:

 Use lsof to see what files are actually being held open.

 That stacktrace looks to me like it's from the driver, not executors.
 Where in foreach is it being called?  The outermost portion of foreachRDD
 runs in the driver, the innermost portion runs in the executors.  From the
 docs:

 https://spark.apache.org/docs/latest/streaming-programming-guide.html

 dstream.foreachRDD { rdd =
   val connection = createNewConnection()  // executed at the driver
   rdd.foreach { record =
 connection.send(record) // executed at the worker
   }}


 @td I've specifically looked at kafka socket connections for the standard
 1.3 code vs my branch that has cached connections.  The standard
 non-caching code has very short lived connections.  I've had jobs running
 for a month at a time, including ones writing to mysql.  Not saying it's
 impossible, but I'd think we need some evidence before speculating this has
 anything to do with it.


 On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 This function is called in foreachRDD. I think it should be running in
 the executors. I add the statement.close() in the code and it is running. I
 will let you know if this fixes the issue.



 On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the function ingestToMysql running on the driver or on the executors?
 Accordingly you can try debugging while running in a distributed manner,
 with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(),
 the problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or
 the sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
  wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka
 in the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark
 log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException:
 Too many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-30 Thread Bill Jay
I terminated the old job and now start a new one. Currently, the Spark
streaming job has been running for 2 hours and when I use lsof, I do not
see many files related to the Spark job.

BTW, I am running Spark streaming using local[2] mode. The batch is 5
seconds and it has around 50 lines to read each batch.

On Thu, Apr 30, 2015 at 11:15 AM, Cody Koeninger c...@koeninger.org wrote:

 Did you use lsof to see what files were opened during the job?

 On Thu, Apr 30, 2015 at 1:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 The data ingestion is in outermost portion in foreachRDD block. Although
 now I close the statement of jdbc, the same exception happened again. It
 seems it is not related to the data ingestion part.

 On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Use lsof to see what files are actually being held open.

 That stacktrace looks to me like it's from the driver, not executors.
 Where in foreach is it being called?  The outermost portion of foreachRDD
 runs in the driver, the innermost portion runs in the executors.  From the
 docs:

 https://spark.apache.org/docs/latest/streaming-programming-guide.html

 dstream.foreachRDD { rdd =
   val connection = createNewConnection()  // executed at the driver
   rdd.foreach { record =
 connection.send(record) // executed at the worker
   }}


 @td I've specifically looked at kafka socket connections for the
 standard 1.3 code vs my branch that has cached connections.  The standard
 non-caching code has very short lived connections.  I've had jobs running
 for a month at a time, including ones writing to mysql.  Not saying it's
 impossible, but I'd think we need some evidence before speculating this has
 anything to do with it.


 On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 This function is called in foreachRDD. I think it should be running in
 the executors. I add the statement.close() in the code and it is running. I
 will let you know if this fixes the issue.



 On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the function ingestToMysql running on the driver or on the
 executors? Accordingly you can try debugging while running in a 
 distributed
 manner, with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(),
 the problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then 
 its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
  wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or
 the sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay 
 bill.jaypeter...@gmail.com wrote:

 Hi all,

 I am using the direct approach to receive real-time data from
 Kafka in the following link:


 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark
 log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for
 time 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException:
 Too many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too 
 many
 open files, java.io.IOException: Too

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
Thanks for the suggestion. I ran the command and the limit is 1024.

Based on my understanding, the connector to Kafka should not open so many
files. Do you think there is possible socket leakage? BTW, in every batch
which is 5 seconds, I output some results to mysql:

  def ingestToMysql(data: Array[String]) {
val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
var sql = insert into loggingserver1 values 
data.foreach(line = sql += line)
sql = sql.dropRight(1)
sql += ;
logger.info(sql)
var conn: java.sql.Connection = null
try {
  conn = DriverManager.getConnection(url)
  val statement = conn.createStatement()
  statement.executeUpdate(sql)
} catch {
  case e: Exception = logger.error(e.getMessage())
} finally {
  if (conn != null) {
conn.close
  }
}
  }

I am not sure whether the leakage originates from Kafka connector or the
sql connections.

Bill

On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream

Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
Hi all,

I am using the direct approach to receive real-time data from Kafka in the
following link:

https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


My code follows the word count direct example:

https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



After around 12 hours, I got the following error messages in Spark log:

15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
143033869 ms
org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many
open files, java.io.IOException: Too many open files, java.io.IOException:
Too many open files, java.io.IOException: Too many open files,
java.io.IOException: Too many open files)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
This function is called in foreachRDD. I think it should be running in the
executors. I add the statement.close() in the code and it is running. I
will let you know if this fixes the issue.



On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote:

 Is the function ingestToMysql running on the driver or on the executors?
 Accordingly you can try debugging while running in a distributed manner,
 with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(), the
 problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1

Re: Lifecycle of RDD in spark-streaming

2014-11-27 Thread Bill Jay
Gerard,

That is a good observation. However, the strange thing I meet is if I use
MEMORY_AND_DISK_SER, the job even fails earlier. In my case, it takes 10
seconds to process my data of every batch, which is one minute. It fails
after 10 hours with the cannot compute split error.

Bill

On Thu, Nov 27, 2014 at 3:31 AM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi TD,

 We also struggled with this error for a long while.  The recurring
 scenario is when the job takes longer to compute than the job interval and
 a backlog starts to pile up.
 Hint: Check
 If the DStream storage level is set to MEMORY_ONLY_SER and memory runs
 out,  then you will get a 'Cannot compute split: Missing block ...'.

 What I don't know ATM is whether the new data is dropped or the LRU policy
 removes data in the system in favor for the incoming data.
 In any case, the DStream processing still thinks the data is there at the
 moment the job is scheduled to run and fails to run.

 In our case, changing storage to MEMORY_AND_DISK_SER solved the problem
 and our streaming job can get through tought times without issues.

 Regularly checking 'scheduling delay' and 'total delay' on the Streaming
 tab in the UI is a must.  (And soon we will have that on the metrics report
 as well!! :-) )

 -kr, Gerard.



 On Thu, Nov 27, 2014 at 8:14 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi TD,

 I am using Spark Streaming to consume data from Kafka and do some
 aggregation and ingest the results into RDS. I do use foreachRDD in the
 program. I am planning to use Spark streaming in our production pipeline
 and it performs well in generating the results. Unfortunately, we plan to
 have a production pipeline 24/7 and Spark streaming job usually fails after
 8-20 hours due to the exception cannot compute split. In other cases, the
 Kafka receiver has failure and the program runs without producing any
 result.

 In my pipeline, the batch size is 1 minute and the data volume per minute
 from Kafka is 3G. I have been struggling with this issue for more than a
 month. It will be great if you can provide some solutions for this. Thanks!

 Bill


 On Wed, Nov 26, 2014 at 5:35 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you elaborate on the usage pattern that lead to cannot compute
 split ? Are you using the RDDs generated by DStream, outside the
 DStream logic? Something like running interactive Spark jobs
 (independent of the Spark Streaming ones) on RDDs generated by
 DStreams? If that is the case, what is happening is that Spark
 Streaming is not aware that some of the RDDs (and the raw input data
 that it will need) will be used by Spark jobs unrelated to Spark
 Streaming. Hence Spark Streaming will actively clear off the raw data,
 leading to failures in the unrelated Spark jobs using that data.

 In case this is your use case, the cleanest way to solve this, is by
 asking Spark Streaming remember stuff for longer, by using
 streamingContext.remember(duration). This will ensure that Spark
 Streaming will keep around all the stuff for at least that duration.
 Hope this helps.

 TD

 On Wed, Nov 26, 2014 at 5:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:
  Just add one more point. If Spark streaming knows when the RDD will
 not be
  used any more, I believe Spark will not try to retrieve data it will
 not use
  any more. However, in practice, I often encounter the error of cannot
  compute split. Based on my understanding, this is  because Spark
 cleared
  out data that will be used again. In my case, the data volume is much
  smaller (30M/s, the batch size is 60 seconds) than the memory (20G each
  executor). If Spark will only keep RDD that are in use, I expect that
 this
  error may not happen.
 
  Bill
 
  On Wed, Nov 26, 2014 at 4:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com
  wrote:
 
  Let me further clarify Lalit's point on when RDDs generated by
  DStreams are destroyed, and hopefully that will answer your original
  questions.
 
  1.  How spark (streaming) guarantees that all the actions are taken on
  each input rdd/batch.
  This is isnt hard! By the time you call streamingContext.start(), you
  have already set up the output operations (foreachRDD, saveAs***Files,
  etc.) that you want to do with the DStream. There are RDD actions
  inside the DStream output oeprations that need to be done every batch
  interval. So all the systems does is this - after every batch
  interval, put all the output operations (that will call RDD actions)
  in a job queue, and then keep executing stuff in the queue. If there
  is any failure in running the jobs, the streaming context will stop.
 
  2.  How does spark determines that the life-cycle of a rdd is
  complete. Is there any chance that a RDD will be cleaned out of ram
  before all actions are taken on them?
  Spark Streaming knows when the all the processing related to batch T
  has been completed. And also it keeps track of how much time of the
  previous RDDs does

Re: Lifecycle of RDD in spark-streaming

2014-11-26 Thread Bill Jay
Just add one more point. If Spark streaming knows when the RDD will not be
used any more, I believe Spark will not try to retrieve data it will not
use any more. However, in practice, I often encounter the error of cannot
compute split. Based on my understanding, this is  because Spark cleared
out data that will be used again. In my case, the data volume is much
smaller (30M/s, the batch size is 60 seconds) than the memory (20G each
executor). If Spark will only keep RDD that are in use, I expect that this
error may not happen.

Bill

On Wed, Nov 26, 2014 at 4:02 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Let me further clarify Lalit's point on when RDDs generated by
 DStreams are destroyed, and hopefully that will answer your original
 questions.

 1.  How spark (streaming) guarantees that all the actions are taken on
 each input rdd/batch.
 This is isnt hard! By the time you call streamingContext.start(), you
 have already set up the output operations (foreachRDD, saveAs***Files,
 etc.) that you want to do with the DStream. There are RDD actions
 inside the DStream output oeprations that need to be done every batch
 interval. So all the systems does is this - after every batch
 interval, put all the output operations (that will call RDD actions)
 in a job queue, and then keep executing stuff in the queue. If there
 is any failure in running the jobs, the streaming context will stop.

 2.  How does spark determines that the life-cycle of a rdd is
 complete. Is there any chance that a RDD will be cleaned out of ram
 before all actions are taken on them?
 Spark Streaming knows when the all the processing related to batch T
 has been completed. And also it keeps track of how much time of the
 previous RDDs does it need to remember and keep around in the cache
 based on what DStream operations have been done. For example, if you
 are using a window 1 minute, the system knows that it needs to keep
 around at least last 1 minute data in the memory. Accordingly, it
 cleans up the input data (actively unpersisted), and cached RDD
 (simply dereferenced from DStream metadata, and then Spark unpersists
 them as the RDD object gets GarbageCollected by the JVM).

 TD



 On Wed, Nov 26, 2014 at 10:10 AM, tian zhang
 tzhang...@yahoo.com.invalid wrote:
  I have found this paper seems to answer most of questions about life
  duration.
 
 https://www.cs.berkeley.edu/~matei/papers/2012/hotcloud_spark_streaming.pdf
 
  Tian
 
 
  On Tuesday, November 25, 2014 4:02 AM, Mukesh Jha 
 me.mukesh@gmail.com
  wrote:
 
 
  Hey Experts,
 
  I wanted to understand in detail about the lifecycle of rdd(s) in a
  streaming app.
 
  From my current understanding
  - rdd gets created out of the realtime input stream.
  - Transform(s) functions are applied in a lazy fashion on the RDD to
  transform into another rdd(s).
  - Actions are taken on the final transformed rdds to get the data out of
 the
  system.
 
  Also rdd(s) are stored in the clusters RAM (disc if configured so) and
 are
  cleaned in LRU fashion.
 
  So I have the following questions on the same.
  - How spark (streaming) guarantees that all the actions are taken on each
  input rdd/batch.
  - How does spark determines that the life-cycle of a rdd is complete. Is
  there any chance that a RDD will be cleaned out of ram before all actions
  are taken on them?
 
  Thanks in advance for all your help. Also, I'm relatively new to scala 
  spark so pardon me in case these are naive questions/assumptions.
 
  --
  Thanks  Regards,
  Mukesh Jha
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Lifecycle of RDD in spark-streaming

2014-11-26 Thread Bill Jay
Hi TD,

I am using Spark Streaming to consume data from Kafka and do some
aggregation and ingest the results into RDS. I do use foreachRDD in the
program. I am planning to use Spark streaming in our production pipeline
and it performs well in generating the results. Unfortunately, we plan to
have a production pipeline 24/7 and Spark streaming job usually fails after
8-20 hours due to the exception cannot compute split. In other cases, the
Kafka receiver has failure and the program runs without producing any
result.

In my pipeline, the batch size is 1 minute and the data volume per minute
from Kafka is 3G. I have been struggling with this issue for more than a
month. It will be great if you can provide some solutions for this. Thanks!

Bill


On Wed, Nov 26, 2014 at 5:35 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you elaborate on the usage pattern that lead to cannot compute
 split ? Are you using the RDDs generated by DStream, outside the
 DStream logic? Something like running interactive Spark jobs
 (independent of the Spark Streaming ones) on RDDs generated by
 DStreams? If that is the case, what is happening is that Spark
 Streaming is not aware that some of the RDDs (and the raw input data
 that it will need) will be used by Spark jobs unrelated to Spark
 Streaming. Hence Spark Streaming will actively clear off the raw data,
 leading to failures in the unrelated Spark jobs using that data.

 In case this is your use case, the cleanest way to solve this, is by
 asking Spark Streaming remember stuff for longer, by using
 streamingContext.remember(duration). This will ensure that Spark
 Streaming will keep around all the stuff for at least that duration.
 Hope this helps.

 TD

 On Wed, Nov 26, 2014 at 5:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:
  Just add one more point. If Spark streaming knows when the RDD will not
 be
  used any more, I believe Spark will not try to retrieve data it will not
 use
  any more. However, in practice, I often encounter the error of cannot
  compute split. Based on my understanding, this is  because Spark cleared
  out data that will be used again. In my case, the data volume is much
  smaller (30M/s, the batch size is 60 seconds) than the memory (20G each
  executor). If Spark will only keep RDD that are in use, I expect that
 this
  error may not happen.
 
  Bill
 
  On Wed, Nov 26, 2014 at 4:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com
  wrote:
 
  Let me further clarify Lalit's point on when RDDs generated by
  DStreams are destroyed, and hopefully that will answer your original
  questions.
 
  1.  How spark (streaming) guarantees that all the actions are taken on
  each input rdd/batch.
  This is isnt hard! By the time you call streamingContext.start(), you
  have already set up the output operations (foreachRDD, saveAs***Files,
  etc.) that you want to do with the DStream. There are RDD actions
  inside the DStream output oeprations that need to be done every batch
  interval. So all the systems does is this - after every batch
  interval, put all the output operations (that will call RDD actions)
  in a job queue, and then keep executing stuff in the queue. If there
  is any failure in running the jobs, the streaming context will stop.
 
  2.  How does spark determines that the life-cycle of a rdd is
  complete. Is there any chance that a RDD will be cleaned out of ram
  before all actions are taken on them?
  Spark Streaming knows when the all the processing related to batch T
  has been completed. And also it keeps track of how much time of the
  previous RDDs does it need to remember and keep around in the cache
  based on what DStream operations have been done. For example, if you
  are using a window 1 minute, the system knows that it needs to keep
  around at least last 1 minute data in the memory. Accordingly, it
  cleans up the input data (actively unpersisted), and cached RDD
  (simply dereferenced from DStream metadata, and then Spark unpersists
  them as the RDD object gets GarbageCollected by the JVM).
 
  TD
 
 
 
  On Wed, Nov 26, 2014 at 10:10 AM, tian zhang
  tzhang...@yahoo.com.invalid wrote:
   I have found this paper seems to answer most of questions about life
   duration.
  
  
 https://www.cs.berkeley.edu/~matei/papers/2012/hotcloud_spark_streaming.pdf
  
   Tian
  
  
   On Tuesday, November 25, 2014 4:02 AM, Mukesh Jha
   me.mukesh@gmail.com
   wrote:
  
  
   Hey Experts,
  
   I wanted to understand in detail about the lifecycle of rdd(s) in a
   streaming app.
  
   From my current understanding
   - rdd gets created out of the realtime input stream.
   - Transform(s) functions are applied in a lazy fashion on the RDD to
   transform into another rdd(s).
   - Actions are taken on the final transformed rdds to get the data out
 of
   the
   system.
  
   Also rdd(s) are stored in the clusters RAM (disc if configured so) and
   are
   cleaned in LRU fashion.
  
   So I have the following questions

Re: Error when Spark streaming consumes from Kafka

2014-11-23 Thread Bill Jay
Hi Dibyendu,

Thank you for answer. I will try the Spark-Kafka consumer.

Bill

On Sat, Nov 22, 2014 at 9:15 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 I believe this is something to do with how Kafka High Level API manages
 consumers within a Consumer group and how it re-balance during failure. You
 can find some mention in this Kafka wiki.

 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design

 Due to various issues in Kafka High Level APIs, Kafka is moving the High
 Level Consumer API to a complete new set of API in Kafka 0.9.

 Other than this co-ordination issue, High Level consumer also has data
 loss issues.

 You can probably try this Spark-Kafka consumer which uses Low Level Simple
 consumer API which is more performant and have no data loss scenarios.

 https://github.com/dibbhatt/kafka-spark-consumer

 Regards,
 Dibyendu

 On Sun, Nov 23, 2014 at 2:13 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using Spark to consume from Kafka. However, after the job has run
 for several hours, I saw the following failure of an executor:

 kafka.common.ConsumerRebalanceFailedException: 
 group-1416624735998_ip-172-31-5-242.ec2.internal-1416648124230-547d2c31 
 can't rebalance after 4 retries
 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
 
 kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
 
 kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
 
 kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138)
 
 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
 
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
 
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)


 Does anyone know the reason for this exception? Thanks!

 Bill





Error when Spark streaming consumes from Kafka

2014-11-22 Thread Bill Jay
Hi all,

I am using Spark to consume from Kafka. However, after the job has run for
several hours, I saw the following failure of an executor:

kafka.common.ConsumerRebalanceFailedException:
group-1416624735998_ip-172-31-5-242.ec2.internal-1416648124230-547d2c31
can't rebalance after 4 retries

kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)

kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)

kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)

kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138)

org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)

org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)

org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)

org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)

org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)


Does anyone know the reason for this exception? Thanks!

Bill


Re: Spark streaming cannot receive any message from Kafka

2014-11-18 Thread Bill Jay
Hi Jerry,

I looked at KafkaUtils.createStream api and found actually the
spark.default.parallelism is specified in SparkConf instead. I do not
remember the exact stacks of the exception. But the exception was incurred
when createStream was called if we do not specify the
spark.default.parallelism. The error message basically shows parsing an
empty string into Int if spark.default.parallelism is not specified.

Bill

On Mon, Nov 17, 2014 at 4:45 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi Bill,



 Would you mind describing what you found a little more specifically, I’m
 not sure there’s the a parameter in KafkaUtils.createStream you can specify
 the spark parallelism, also what is the exception stacks.



 Thanks

 Jerry



 *From:* Bill Jay [mailto:bill.jaypeter...@gmail.com]
 *Sent:* Tuesday, November 18, 2014 2:47 AM
 *To:* Helena Edelson
 *Cc:* Jay Vyas; u...@spark.incubator.apache.org; Tobias Pfeiffer; Shao,
 Saisai

 *Subject:* Re: Spark streaming cannot receive any message from Kafka



 Hi all,



 I find the reason of this issue. It seems in the new version, if I do not
 specify spark.default.parallelism in KafkaUtils.createstream, there will be
 an exception since the kakfa stream creation stage. In the previous
 versions, it seems Spark will use the default value.



 Thanks!



 Bill



 On Thu, Nov 13, 2014 at 5:00 AM, Helena Edelson 
 helena.edel...@datastax.com wrote:

 I encounter no issues with streaming from kafka to spark in 1.1.0. Do you
 perhaps have a version conflict?

 Helena

 On Nov 13, 2014 12:55 AM, Jay Vyas jayunit100.apa...@gmail.com wrote:

  Yup , very important that  n1 for spark streaming jobs, If local use
 local[2]



 The thing to remember is that your spark receiver will take a thread to
 itself and produce data , so u need another thread to consume it .



 In a cluster manager like yarn or mesos, the word thread Is not used
 anymore, I guess has different meaning- you need 2 or more free compute
 slots, and that should be guaranteed by looking to see how many free node
 managers are running etc.


 On Nov 12, 2014, at 7:53 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Did you configure Spark master as local, it should be local[n], n  1
 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming
 example, you can try that. I’ve tested with latest master, it’s OK.



 Thanks

 Jerry



 *From:* Tobias Pfeiffer [mailto:t...@preferred.jp t...@preferred.jp]
 *Sent:* Thursday, November 13, 2014 8:45 AM
 *To:* Bill Jay
 *Cc:* u...@spark.incubator.apache.org
 *Subject:* Re: Spark streaming cannot receive any message from Kafka



 Bill,



   However, when I am currently using Spark 1.1.0. the Spark streaming job
 cannot receive any messages from Kafka. I have not made any change to the
 code.



 Do you see any suspicious messages in the log output?



 Tobias







Spark streaming: java.io.IOException: Version Mismatch (Expected: 28, Received: 18245 )

2014-11-18 Thread Bill Jay
Hi all,

I am running a Spark Streaming job. It was able to produce the correct
results up to some time. Later on, the job was still running but producing
no result. I checked the Spark streaming UI and found that 4 tasks of a
stage failed.

The error messages showed that Job aborted due to stage failure: Task 0 in
stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0
(TID 400048, ip-172-31-13-130.ec2.internal): ExecutorLostFailure (executor
lost)
Driver stacktrace:

I further clicked the stage and found 4 executors running the stages had
the error message:

ExecutorLostFailure (executor lost)


The stage that failed was actually runJob at ReceiverTracker.scala:275
http://ec2-54-172-118-237.compute-1.amazonaws.com:9046/proxy/application_1415902783817_0019/stages/stage?id=2attempt=0,
which is the stage that keeps receiving message from Kafka. I guess that is
why the job does not produce results any more.

To investigate it, I logged into one of the executor machine and checked
the hadoop log. The log file contains a lot of exception message:

*java.io.IOException: Version Mismatch (Expected: 28, Received: 18245 )*


This streaming job is reading from Kafka and producing aggregation results.
After this stage failure, the job is still running but there is no data
shuffle as seen in the Spark UI.

The amount of the time this job can run correctly varies from job to job.
Does anyone has an idea why this Spark Streaming job had this exception?
And why it cannot recover from the stage failure?

Thanks!

Bill


Re: Spark streaming cannot receive any message from Kafka

2014-11-17 Thread Bill Jay
Hi all,

I find the reason of this issue. It seems in the new version, if I do not
specify spark.default.parallelism in KafkaUtils.createstream, there will be
an exception since the kakfa stream creation stage. In the previous
versions, it seems Spark will use the default value.

Thanks!

Bill

On Thu, Nov 13, 2014 at 5:00 AM, Helena Edelson helena.edel...@datastax.com
 wrote:

 I encounter no issues with streaming from kafka to spark in 1.1.0. Do you
 perhaps have a version conflict?

 Helena
 On Nov 13, 2014 12:55 AM, Jay Vyas jayunit100.apa...@gmail.com wrote:

 Yup , very important that  n1 for spark streaming jobs, If local use
 local[2]

 The thing to remember is that your spark receiver will take a thread to
 itself and produce data , so u need another thread to consume it .

 In a cluster manager like yarn or mesos, the word thread Is not used
 anymore, I guess has different meaning- you need 2 or more free compute
 slots, and that should be guaranteed by looking to see how many free node
 managers are running etc.

 On Nov 12, 2014, at 7:53 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Did you configure Spark master as local, it should be local[n], n  1
 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming
 example, you can try that. I’ve tested with latest master, it’s OK.



 Thanks

 Jerry



 *From:* Tobias Pfeiffer [mailto:t...@preferred.jp t...@preferred.jp]
 *Sent:* Thursday, November 13, 2014 8:45 AM
 *To:* Bill Jay
 *Cc:* u...@spark.incubator.apache.org
 *Subject:* Re: Spark streaming cannot receive any message from Kafka



 Bill,



   However, when I am currently using Spark 1.1.0. the Spark streaming
 job cannot receive any messages from Kafka. I have not made any change to
 the code.



 Do you see any suspicious messages in the log output?



 Tobias






Spark streaming cannot receive any message from Kafka

2014-11-12 Thread Bill Jay
Hi all,

I have a Spark streaming job which constantly receives messages from Kafka.
I was using Spark 1.0.2 and the job has been running for a month. However,
when I am currently using Spark 1.1.0. the Spark streaming job cannot
receive any messages from Kafka. I have not made any change to the code.
Below please find the code snippet for the Kafkaconsumer:

 var Array(zkQuorum, topics, mysqlTable) = args
val currentTime: Long = System.currentTimeMillis
val group = my-group- + currentTime.toString
println(topics)
println(zkQuorum)
val numThreads = 1
val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
ssc = new StreamingContext(conf, Seconds(batch))
ssc.checkpoint(hadoopOutput + checkpoint)
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)
val lineCounts =
lines.count.saveAsTextFiles(hadoopOutput+counts/result)


I checked the values in topics and zkQuorum and they are correct. I use the
same information with kafka-console-consumer and it works correctly.

Does anyone know the reason? Thanks!

Bill


Spark streaming cannot receive any message from Kafka

2014-11-12 Thread Bill Jay
Hi all,

I have a Spark streaming job which constantly receives messages from Kafka.
I was using Spark 1.0.2 and the job has been running for a month. However,
when I am currently using Spark 1.1.0. the Spark streaming job cannot
receive any messages from Kafka. I have not made any change to the code.
Below please find the code snippet for the Kafka consumer:

 var Array(zkQuorum, topics, mysqlTable) = args
val currentTime: Long = System.currentTimeMillis
val group = my-group- + currentTime.toString
println(topics)
println(zkQuorum)
val numThreads = 1
val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
ssc = new StreamingContext(conf, Seconds(batch))
ssc.checkpoint(hadoopOutput + checkpoint)
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)
val lineCounts =
lines.count.saveAsTextFiles(hadoopOutput+counts/result)


I checked the values in topics and zkQuorum and they are correct. I use the
same information with kafka-console-consumer and it works correctly.

Does anyone know the reason? Thanks!

Bill


Re: Spark streaming cannot receive any message from Kafka

2014-11-12 Thread Bill Jay
Hi all,

Thanks for the information. I am running Spark streaming in a yarn cluster
and the configuration should be correct. I followed the KafkaWordCount to
write the current code three months ago. It has been working for several
months. The messages are in json format. Actually, this code worked a few
days ago. But now it is not working. Below please find my spark submit
script:


SPARK_BIN=/home/hadoop/spark/bin/
$SPARK_BIN/spark-submit \
 --class com.test \
 --master yarn-cluster \
 --deploy-mode cluster \
 --verbose \
 --driver-memory 20G \
 --executor-memory 20G \
 --executor-cores 6 \
 --num-executors $2 \
 $1 $3 $4 $5

Thanks!

 Bill

On Wed, Nov 12, 2014 at 4:53 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Did you configure Spark master as local, it should be local[n], n  1
 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming
 example, you can try that. I’ve tested with latest master, it’s OK.



 Thanks

 Jerry



 *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
 *Sent:* Thursday, November 13, 2014 8:45 AM
 *To:* Bill Jay
 *Cc:* u...@spark.incubator.apache.org
 *Subject:* Re: Spark streaming cannot receive any message from Kafka



 Bill,



   However, when I am currently using Spark 1.1.0. the Spark streaming job
 cannot receive any messages from Kafka. I have not made any change to the
 code.



 Do you see any suspicious messages in the log output?



 Tobias





Spark streaming job failed due to java.util.concurrent.TimeoutException

2014-11-03 Thread Bill Jay
Hi all,

I have a spark streaming job that consumes data from Kafka and produces
some simple operations on the data. This job is run in an EMR cluster with
10 nodes. The batch size I use is 1 minute and it takes around 10 seconds
to generate the results that are inserted to a MySQL database. However,
after more than 2 days, the job failed with a list of the following error
information in the log:


jjava.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

Does anyone know the reason? Thanks!

Bill


Re: combineByKey at ShuffledDStream.scala

2014-07-23 Thread Bill Jay
The streaming program contains the following main stages:

1. receive data from Kafka
2. preprocessing of the data. These are all map and filtering stages.
3. Group by a field
4. Process the groupBy results using map. Inside this processing, I use
collect, count.

Thanks!

Bill


On Tue, Jul 22, 2014 at 10:05 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Can you give an idea of the streaming program? Rest of the transformation
 you are doing on the input streams?


 On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am currently running a Spark Streaming program, which consumes data
 from Kakfa and does the group by operation on the data. I try to optimize
 the running time of the program because it looks slow to me. It seems the
 stage named:

 * combineByKey at ShuffledDStream.scala:42 *

 always takes the longest running time. And If I open this stage, I only
 see two executors on this stage. Does anyone has an idea what this stage
 does and how to increase the speed for this stage? Thanks!

 Bill





Re: Spark Streaming: no job has started yet

2014-07-23 Thread Bill Jay
The code is pretty long. But the main idea is to consume from Kafka,
preprocess the data, and groupBy a field. I use mutliple DStream to add
parallelism to the consumer. It seems when the number of DStreams is large,
this happens often.

Thanks,

Bill


On Tue, Jul 22, 2014 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Can you paste the piece of code?

 Thanks
 Best Regards


 On Wed, Jul 23, 2014 at 1:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am running a spark streaming job. The job hangs on one stage, which
 shows as follows:

 Details for Stage 4
 Summary Metrics No tasks have started yetTasksNo tasks have started yet



 Does anyone have an idea on this?

 Thanks!

 Bill
 Bill





Get Spark Streaming timestamp

2014-07-23 Thread Bill Jay
Hi all,

I have a question regarding Spark streaming. When we use the
saveAsTextFiles function and my batch is 60 seconds, Spark will generate a
series of files such as:

result-140614896, result-140614802, result-140614808, etc.

I think this is the timestamp for the beginning of each batch. How can we
extract the variable and use it in our code? Thanks!

Bill


Re: Get Spark Streaming timestamp

2014-07-23 Thread Bill Jay
Hi Tobias,

It seems this parameter is an input to the function. What I am expecting is
output from a function that tells me the starting or ending time of the
batch. For instance, If I use saveAsTextFiles, it seems DStream will
generate a batch every minute and the starting time is a complete minute
(batch size is 60 seconds). Thanks!

Bill


On Wed, Jul 23, 2014 at 6:56 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 Spark Streaming's DStream provides overloaded methods for transform() and
 foreachRDD() that allow you to access the timestamp of a batch:

 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream

 I think the timestamp is the end of the batch, not the beginning. For
 example, I compute runtime taking the difference between now() and the time
 I get as a parameter in foreachRDD().

 Tobias



 On Thu, Jul 24, 2014 at 6:39 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a question regarding Spark streaming. When we use the
 saveAsTextFiles function and my batch is 60 seconds, Spark will generate a
 series of files such as:

 result-140614896, result-140614802, result-140614808, etc.

 I think this is the timestamp for the beginning of each batch. How can we
 extract the variable and use it in our code? Thanks!

 Bill





Re: spark streaming rate limiting from kafka

2014-07-22 Thread Bill Jay
Hi Tobias,

I tried to use 10 as numPartition. The number of executors allocated is the
number of DStream. Therefore, it seems the parameter does not spread data
into many partitions. In order to to that, it seems we have to do
repartition. If numPartitions will distribute the data to multiple
executors/partitions, then I will be able to save the running time incurred
by repartition.

Bill




On Mon, Jul 21, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 numPartitions means the number of Spark partitions that the data received
 from Kafka will be split to. It has nothing to do with Kafka partitions, as
 far as I know.

 If you create multiple Kafka consumers, it doesn't seem like you can
 specify which consumer will consume which Kafka partitions. Instead, Kafka
 (at least with the interface that is exposed by the Spark Streaming API)
 will do something called rebalance and assign Kafka partitions to consumers
 evenly, you can see this in the client logs.

 When using multiple Kafka consumers with auto.offset.reset = true, please
 expect to run into this one:
 https://issues.apache.org/jira/browse/SPARK-2383

 Tobias


 On Tue, Jul 22, 2014 at 3:40 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I am currentlycreating multiple DStream to consumefrom different topics.
 How can I let each consumer consume from different partitions. I find the
 following parameters from Spark API:

 createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc:
 JavaStreamingContext
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html
 , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class
 [U], valueDecoderClass: Class[T], kafkaParams: Map[String, String],
 topics: Map[String, Integer],storageLevel: StorageLevel
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html
 ): JavaPairReceiverInputDStream
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html
 [K, V]

 Create an input stream that pulls messages form a Kafka Broker.




 The topics parameter is:
 *Map of (topic_name - numPartitions) to consume. Each partition is
 consumed in its own thread*

 Does numPartitions mean the total number of partitions to consume from
 topic_name or the index of the partition? How can we specify for each
 createStream which partition of the Kafka topic to consume? I think if so,
 I will get a lot of parallelism from the source of the data. Thanks!

 Bill

 On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 You can create multiple kafka stream to partition your topics across
 them, which will run multiple receivers or multiple executors. This is
 covered in the Spark streaming guide.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 And for the purpose of this thread, to answer the original question, we now
 have the ability
 https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
 to limit the receiving rate. Its in the master branch, and will be
 available in Spark 1.1. It basically sets the limits at the receiver level
 (so applies to all sources) on what is the max records per second that can
 will be received by the receiver.

 TD


 On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song









combineByKey at ShuffledDStream.scala

2014-07-22 Thread Bill Jay
Hi all,

I am currently running a Spark Streaming program, which consumes data from
Kakfa and does the group by operation on the data. I try to optimize the
running time of the program because it looks slow to me. It seems the stage
named:

* combineByKey at ShuffledDStream.scala:42 *

always takes the longest running time. And If I open this stage, I only see
two executors on this stage. Does anyone has an idea what this stage does
and how to increase the speed for this stage? Thanks!

Bill


Spark Streaming: no job has started yet

2014-07-22 Thread Bill Jay
Hi all,

I am running a spark streaming job. The job hangs on one stage, which shows
as follows:

Details for Stage 4
Summary MetricsNo tasks have started yetTasksNo tasks have started yet



Does anyone have an idea on this?

Thanks!

Bill
Bill


Re: spark streaming rate limiting from kafka

2014-07-21 Thread Bill Jay
Hi Tathagata,

I am currentlycreating multiple DStream to consumefrom different topics.
How can I let each consumer consume from different partitions. I find the
following parameters from Spark API:

createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc:
JavaStreamingContext
https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html
, keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class[U]
, valueDecoderClass: Class[T], kafkaParams: Map[String, String], topics: Map
[String, Integer],storageLevel: StorageLevel
https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html
): JavaPairReceiverInputDStream
https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html
[K, V]

Create an input stream that pulls messages form a Kafka Broker.




The topics parameter is:
*Map of (topic_name - numPartitions) to consume. Each partition is
consumed in its own thread*

Does numPartitions mean the total number of partitions to consume from
topic_name or the index of the partition? How can we specify for each
createStream which partition of the Kafka topic to consume? I think if so,
I will get a lot of parallelism from the source of the data. Thanks!

Bill

On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 You can create multiple kafka stream to partition your topics across them,
 which will run multiple receivers or multiple executors. This is covered in
 the Spark streaming guide.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 And for the purpose of this thread, to answer the original question, we now
 have the ability
 https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
 to limit the receiving rate. Its in the master branch, and will be
 available in Spark 1.1. It basically sets the limits at the receiver level
 (so applies to all sources) on what is the max records per second that can
 will be received by the receiver.

 TD


 On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song







Re: spark streaming rate limiting from kafka

2014-07-20 Thread Bill Jay
Hi Tobias,

It seems that repartition can create more executors for the stages
following data receiving. However, the number of executors is still far
less than what I require (I specify one core for each executor). Based on
the index of the executors in the stage, I find many numbers are missing in
between. For example, if I repartition(100), the index of executors may be
1, 3, 5, 10, etc. Finally, there may be 45 executors although I request 100
partitions.

Bill


On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song






Re: Spark Streaming timestamps

2014-07-18 Thread Bill Jay
Hi Tathagata,




On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 The RDD parameter in foreachRDD contains raw/transformed data from the
 last batch. So when forearchRDD is called with the time parameter as 5:02:01
 and batch size is 1 minute, then the rdd will contain data based on the
 data received by between 5:02:00 and 5:02:01.

Do you mean the data between 5:02:02 and 5:02:01? The time parameter is
5:02:01. Moreover, when the program is running, it is very difficult to
specify a starting time because sometimes it is difficult to know when the
program executes that line. And do we need a different time parameter for
each foreachRDD or Spark will calculate the next one according to batch.


 If you want to do custom intervals, then I suggest the following
 1. Do 1 second batch intervals
 2. Then in the foreachRDD, from  5:02:30 to 5:03:28, put all the RDDs in
 a ArrayBuffer/ListBuffer
 3. At 5:03:29, add the RDD to the buffer, and do a union of all the
 buffered RDDs, and process them.

 So in foreachRDD, based on the time, buffer the RDDs, until you reach the
 appropriate time. Then union all the buffered RDDs and process them.

 TD


 On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Thanks for your answer. Please see my further question below:


 On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Answers inline.


 On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am currently using Spark Streaming to conduct a real-time data
 analytics. We receive data from Kafka. We want to generate output files
 that contain results that are based on the data we receive from a specific
 time interval.

 I have several questions on Spark Streaming's timestamp:

 1) If I use saveAsTextFiles, it seems Spark streaming will generate
 files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix
 time), etc. Does this mean the results are based on the data from 5:00:01
 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time
 the files are generated?

 File named  5:00:01 contains results from data received between
  5:00:00 and  5:00:01 (based on system time of the cluster).



 2) If I do not use saveAsTextFiles, how do I get the exact time
 interval of the RDD when I use foreachRDD to do custom output of the
 results?

 There is a version of foreachRDD which allows you specify the function
 that takes in Time object.


 3) How can we specify the starting time of the batches?


 What do you mean? Batches are timed based on the system time of the
 cluster.

 I would like to control the starting time and ending time of each batch.
 For example, if I use saveAsTextFiles as output method and the batch size
 is 1 minute, Spark will align time intervals to complete minutes, such as
 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03,
 5:02:03, 5:03:03, etc. My goal is to generate output for a customized
 interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc.

 I checked the api of foreachRDD with time parameter. It seems there is
 not explanation on what does that parameter mean. Does it mean the starting
 time of the first batch?




 Thanks!

 Bill







Re: Error: No space left on device

2014-07-17 Thread Bill Jay
Hi,

I also have some issues with repartition. In my program, I consume data
from Kafka. After I consume data, I use repartition(N). However, although I
set N to be 120, there are around 18 executors allocated for my reduce
stage. I am not sure how the repartition command works ton ensure the
parallelism.

Bill


On Thu, Jul 17, 2014 at 12:56 AM, Xiangrui Meng men...@gmail.com wrote:

 Set N be the total number of cores on the cluster or less. sc.textFile
 doesn't always give you that number, depends on the block size. For
 MovieLens, I think the default behavior should be 2~3 partitions. You
 need to call repartition to ensure the right number of partitions.

 Which EC2 instance type did you use? I usually use m3.2xlarge or c?
 instances that come with SSD and 1G or 10G network. For those
 instances, you should see local drives mounted at /mnt, /mnt2, /mnt3,
 ... Make sure there was no error when you used the ec2 script to
 launch the cluster.

 It is a little strange to see 94% of / was used on a slave. Maybe
 shuffle data went to /. I'm not sure which settings went wrong. I
 recommend trying re-launching a cluster with m3.2xlarge instances and
 using the default settings (don't set anything in SparkConf). Submit
 the application with --driver-memory 20g.

 The running times are slower than what I remember, but it depends on
 the instance type.

 Best,
 Xiangrui



 On Wed, Jul 16, 2014 at 10:18 PM, Chris DuBois chris.dub...@gmail.com
 wrote:
  Hi Xiangrui,
 
  I will try this shortly. When using N partitions, do you recommend N be
 the
  number of cores on each slave or the number of cores on the master?
 Forgive
  my ignorance, but is this best achieved as an argument to sc.textFile?
 
  The slaves on the EC2 clusters start with only 8gb of storage, and it
  doesn't seem that /mnt/spark and /mnt2/spark are mounted anywhere else by
  default. Looking at spark-ec2/setup-slaves.sh, it appears that these are
  only mounted if the instance type begins with r3. (Or am I not reading
 that
  right?) My slaves are a different instance type, and currently look like
  this:
  FilesystemSize  Used Avail Use% Mounted on
  /dev/xvda17.9G  7.3G  515M  94% /
  tmpfs 7.4G  4.0K  7.4G   1% /dev/shm
  /dev/xvdv 500G  2.5G  498G   1% /vol
 
  I have been able to finish ALS on MovieLens 10M only twice, taking 221s
 and
  315s for 20 iterations at K=20 and lambda=0.01. Does that timing sound
 about
  right, or does it point to a poor configuration? The same script with
  MovieLens 1M runs fine in about 30-40s with the same settings. (In both
  cases I'm training on 70% of the data.)
 
  Thanks for your help!
  Chris
 
 
  On Wed, Jul 16, 2014 at 4:29 PM, Xiangrui Meng men...@gmail.com wrote:
 
  For ALS, I would recommend repartitioning the ratings to match the
  number of CPU cores or even less. ALS is not computation heavy for
  small k but communication heavy. Having small number of partitions may
  help. For EC2 clusters, we use /mnt/spark and /mnt2/spark as the
  default local directory because they are local hard drives. Did your
  last run of ALS on MovieLens 10M-100K with the default settings
  succeed? -Xiangrui
 
  On Wed, Jul 16, 2014 at 8:00 AM, Chris DuBois chris.dub...@gmail.com
  wrote:
   Hi Xiangrui,
  
   I accidentally did not send df -i for the master node. Here it is at
 the
   moment of failure:
  
   FilesystemInodes   IUsed   IFree IUse% Mounted on
   /dev/xvda1524288  280938  243350   54% /
   tmpfs3845409   1 38454081% /dev/shm
   /dev/xvdb100024321027 100014051% /mnt
   /dev/xvdf10002432  16 100024161% /mnt2
   /dev/xvdv524288000  13 5242879871% /vol
  
   I am using default settings now, but is there a way to make sure that
   the
   proper directories are being used? How many blocks/partitions do you
   recommend?
  
   Chris
  
  
   On Wed, Jul 16, 2014 at 1:09 AM, Chris DuBois chris.dub...@gmail.com
 
   wrote:
  
   Hi Xiangrui,
  
   Here is the result on the master node:
   $ df -i
   FilesystemInodes   IUsed   IFree IUse% Mounted on
   /dev/xvda1524288  273997  250291   53% /
   tmpfs1917974   1 19179731% /dev/shm
   /dev/xvdv524288000  30 5242879701% /vol
  
   I have reproduced the error while using the MovieLens 10M data set
 on a
   newly created cluster.
  
   Thanks for the help.
   Chris
  
  
   On Wed, Jul 16, 2014 at 12:22 AM, Xiangrui Meng men...@gmail.com
   wrote:
  
   Hi Chris,
  
   Could you also try `df -i` on the master node? How many
   blocks/partitions did you set?
  
   In the current implementation, ALS doesn't clean the shuffle data
   because the operations are chained together. But it shouldn't run
 out
   of disk space on the MovieLens dataset, which is small. spark-ec2
   script sets /mnt/spark and /mnt/spark2 as the local.dir by 

Re: Spark Streaming timestamps

2014-07-17 Thread Bill Jay
Hi Tathagata,

Thanks for your answer. Please see my further question below:


On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Answers inline.


 On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am currently using Spark Streaming to conduct a real-time data
 analytics. We receive data from Kafka. We want to generate output files
 that contain results that are based on the data we receive from a specific
 time interval.

 I have several questions on Spark Streaming's timestamp:

 1) If I use saveAsTextFiles, it seems Spark streaming will generate files
 in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time),
 etc. Does this mean the results are based on the data from 5:00:01 to
 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the
 files are generated?

 File named  5:00:01 contains results from data received between  5:00:00
 and  5:00:01 (based on system time of the cluster).



 2) If I do not use saveAsTextFiles, how do I get the exact time interval
 of the RDD when I use foreachRDD to do custom output of the results?

 There is a version of foreachRDD which allows you specify the function
 that takes in Time object.


 3) How can we specify the starting time of the batches?


 What do you mean? Batches are timed based on the system time of the
 cluster.

I would like to control the starting time and ending time of each batch.
For example, if I use saveAsTextFiles as output method and the batch size
is 1 minute, Spark will align time intervals to complete minutes, such as
5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03,
5:02:03, 5:03:03, etc. My goal is to generate output for a customized
interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc.

I checked the api of foreachRDD with time parameter. It seems there is not
explanation on what does that parameter mean. Does it mean the starting
time of the first batch?




 Thanks!

 Bill





Re: spark streaming rate limiting from kafka

2014-07-17 Thread Bill Jay
I also have an issue consuming from Kafka. When I consume from Kafka, there
are always a single executor working on this job. Even I use repartition,
it seems that there is still a single executor. Does anyone has an idea how
to add parallelism to this job?



On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song




Re: Number of executors change during job running

2014-07-16 Thread Bill Jay
Hi Tathagata,

I have tried the repartition method. The reduce stage first had 2 executors
and then it had around 85 executors. I specified repartition(300) and each
of the executors were specified 2 cores when I submitted the job. This
shows repartition works to increase more executors. However, the running
time was still around 50 seconds although I only did a simple groupby
operation. I think repartition may consume part of the running time.
Considering the input source of Kafka, is there a way to make the program
even faster? Thanks!


On Mon, Jul 14, 2014 at 3:22 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you give me a screen shot of the stages page in the web ui, the spark
 logs, and the code that is causing this behavior. This seems quite weird to
 me.

 TD


 On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 It seems repartition does not necessarily force Spark to distribute the
 data into different executors. I have launched a new job which uses
 repartition right after I received data from Kafka. For the first two
 batches, the reduce stage used more than 80 executors. Starting from the
 third batch, there were always only 2 executors in the reduce task
 (combineByKey). Even with the first batch which used more than 80
 executors, it took 2.4 mins to finish the reduce stage for a very small
 amount of data.

 Bill


 On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 After using repartition(300), how many executors did it run on? By the
 way, repartitions(300) means it will divide the shuffled data into 300
 partitions. Since there are many cores on each of the 300
 machines/executors, these partitions (each requiring a core) may not be
 spread all 300 executors. Hence, if you really want spread it all 300
 executors, you may have to bump up the partitions even more. However,
 increasing the partitions to too high may not be beneficial, and you will
 have play around with the number to figure out sweet spot that reduces the
 time to process the stage / time to process the whole batch.

 TD


 On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Do you mean that the data is not shuffled until the reduce stage? That
 means groupBy still only uses 2 machines?

 I think I used repartition(300) after I read the data from Kafka into
 DStream. It seems that it did not guarantee that the map or reduce stages
 will be run on 300 machines. I am currently trying to initiate 100 DStream
 from KafkaUtils.createDStream and union them. Now the reduce stages had
 around 80 machines for all the batches. However, this method will introduce
 many dstreams. It will be good if we can control the number of executors in
 the groupBy operation because the calculation needs to be finished within 1
 minute for different size of input data based on our production need.

 Thanks!


 Bill


 On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Aah, I get it now. That is because the input data streams is
 replicated on two machines, so by locality the data is processed on those
 two machines. So the map stage on the data uses 2 executors, but the
 reduce stage, (after groupByKey) the saveAsTextFiles would use 300 
 tasks.
 And the default parallelism takes into affect only when the data is
 explicitly shuffled around.

 You can fix this by explicitly repartitioning the data.

 inputDStream.repartition(partitions)

 This is covered in the streaming tuning guide
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
 .

 TD



 On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing 
 work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job 
 submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
  wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2

Spark Streaming timestamps

2014-07-16 Thread Bill Jay
Hi all,

I am currently using Spark Streaming to conduct a real-time data analytics.
We receive data from Kafka. We want to generate output files that contain
results that are based on the data we receive from a specific time
interval.

I have several questions on Spark Streaming's timestamp:

1) If I use saveAsTextFiles, it seems Spark streaming will generate files
in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time),
etc. Does this mean the results are based on the data from 5:00:01 to
5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the
files are generated?

2) If I do not use saveAsTextFiles, how do I get the exact time interval of
the RDD when I use foreachRDD to do custom output of the results?

3) How can we specify the starting time of the batches?

Thanks!

Bill


Re: Number of executors change during job running

2014-07-14 Thread Bill Jay
Hi Tathagata,

It seems repartition does not necessarily force Spark to distribute the
data into different executors. I have launched a new job which uses
repartition right after I received data from Kafka. For the first two
batches, the reduce stage used more than 80 executors. Starting from the
third batch, there were always only 2 executors in the reduce task
(combineByKey). Even with the first batch which used more than 80
executors, it took 2.4 mins to finish the reduce stage for a very small
amount of data.

Bill


On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 After using repartition(300), how many executors did it run on? By the
 way, repartitions(300) means it will divide the shuffled data into 300
 partitions. Since there are many cores on each of the 300
 machines/executors, these partitions (each requiring a core) may not be
 spread all 300 executors. Hence, if you really want spread it all 300
 executors, you may have to bump up the partitions even more. However,
 increasing the partitions to too high may not be beneficial, and you will
 have play around with the number to figure out sweet spot that reduces the
 time to process the stage / time to process the whole batch.

 TD


 On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Do you mean that the data is not shuffled until the reduce stage? That
 means groupBy still only uses 2 machines?

 I think I used repartition(300) after I read the data from Kafka into
 DStream. It seems that it did not guarantee that the map or reduce stages
 will be run on 300 machines. I am currently trying to initiate 100 DStream
 from KafkaUtils.createDStream and union them. Now the reduce stages had
 around 80 machines for all the batches. However, this method will introduce
 many dstreams. It will be good if we can control the number of executors in
 the groupBy operation because the calculation needs to be finished within 1
 minute for different size of input data based on our production need.

 Thanks!


 Bill


 On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Aah, I get it now. That is because the input data streams is replicated
 on two machines, so by locality the data is processed on those two
 machines. So the map stage on the data uses 2 executors, but the reduce
 stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the
 default parallelism takes into affect only when the data is explicitly
 shuffled around.

 You can fix this by explicitly repartitioning the data.

 inputDStream.repartition(partitions)

 This is covered in the streaming tuning guide
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
 .

 TD



 On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), data(id2).toString,
data(id3).toString,
 data(log_type).toString, data(sub_log_type).toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field = abs(field._2 -
 field._3) = timeDiff)

 val

Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Praveen,

I did not change the number of total executors. I specified 300 as the
number of executors when I submitted the jobs. However, for some stages,
the number of executors is very small, leading to long calculation time
even for small data set. That means not all executors were used for some
stages.

If I went to the detail of the running time of different executors, I found
some of them had very low running time while very few had very long running
time, leading to long overall running time. Another point I noticed is that
the number of completed tasks are usually larger than the number of total
tasks. That means sometimes the job is still running in some stages
although all the tasks have been finished. These are the too behavior I
observed that may related to the wrong running time.

Bill


On Thu, Jul 10, 2014 at 11:26 PM, Praveen Seluka psel...@qubole.com wrote:

 If I understand correctly, you could not change the number of executors at
 runtime right(correct me if am wrong) - its defined when we start the
 application and fixed. Do you mean number of tasks?


 On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I 
 checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there
 are sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill








Re: Join two Spark Streaming

2014-07-11 Thread Bill Jay
Hi Tathagata,

Thanks for the solution. Actually, I will use the number of unique integers
in the batch instead of accumulative number of unique integers.

I do have two questions about your code:

1. Why do we need uniqueValuesRDD?  Why do we need to call
uniqueValuesRDD.checkpoint()?

2. Where is distinctValues defined?

Bill


On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Do you want to continuously maintain the set of unique integers seen since
 the beginning of stream?

 var uniqueValuesRDD: RDD[Int] = ...

 dstreamOfIntegers.transform(newDataRDD = {
val newUniqueValuesRDD  = newDataRDD.union(distinctValues).distinct
uniqueValuesRDD = newUniqueValuesRDD

// periodically call uniqueValuesRDD.checkpoint()

val uniqueCount = uniqueValuesRDD.count()
newDataRDD.map(x = x / count)
 })





 On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am working on a pipeline that needs to join two Spark streams. The
 input is a stream of integers. And the output is the number of integer's
 appearance divided by the total number of unique integers. Suppose the
 input is:

 1
 2
 3
 1
 2
 2

 There are 3 unique integers and 1 appears twice. Therefore, the output
 for the integer 1 will be:
 1 0.67

 Since the input is from a stream, it seems we need to first join the
 appearance of the integers and the total number of unique integers and then
 do a calculation using map. I am thinking of adding a dummy key to both
 streams and use join. However, a Cartesian product matches the application
 here better. How to do this effectively? Thanks!

 Bill





Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-11 Thread Bill Jay
I have met similar issues. The reason is probably because in Spark
assembly, spark-streaming-kafka is not included. Currently, I am using
Maven to generate a shaded package with all the dependencies. You may try
to use sbt assembly to include the dependencies in your jar file.

Bill


On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote:

  Hi Akhil,

 Can you please guide me through this? Because the code I am running
 already has this in it:
 [java]

 SparkContext sc = new SparkContext();

 sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar);


 Is there something I am missing?

 Thanks,
 Dilip


 On Friday 11 July 2014 12:02 PM, Akhil Das wrote:

  Easiest fix would be adding the kafka jars to the SparkContext while
 creating it.

  Thanks
 Best Regards


 On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote:

 Hi,

 I am trying to run a program with spark streaming using Kafka on a stand
 alone system. These are my details:

 Spark 1.0.0 hadoop2
 Scala 2.10.3

 I am trying a simple program using my custom sbt project but this is the
 error I am getting:

 Exception in thread main java.lang.NoClassDefFoundError:
 kafka/serializer/StringDecoder
 at
 org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
 at
 org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
 at
 org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
 at SimpleJavaApp.main(SimpleJavaApp.java:40)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException:
 kafka.serializer.StringDecoder
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 11 more


 here is my .sbt file:

 name := Simple Project

 version := 1.0

 scalaVersion := 2.10.3

 libraryDependencies += org.apache.spark %% spark-core % 1.0.0

 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0

 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0

 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0

 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10
 % 1.0.0

 libraryDependencies += org.apache.kafka %% kafka % 0.8.0

 resolvers += Akka Repository at http://repo.akka.io/releases/;

 resolvers += Maven Repository at http://central.maven.org/maven2/;


 sbt package was successful. I also tried sbt ++2.10.3 package to build
 it for my scala version. Problem remains the same.
 Can anyone help me out here? Ive been stuck on this for quite some time
 now.

 Thank You,
 Dilip






Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

I also tried to use the number of partitions as parameters to the functions
such as groupByKey. It seems the numbers of executors is around 50 instead
of 300, which is the number of the executors I specified in submission
script. Moreover, the running time of different executors is skewed. The
ideal case is that Spark can distribute the data into 300 executors evenly
so that the computation can be efficiently finished. I am not sure how to
achieve this.

Thanks!

Bill


On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there are
 sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill







Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-11 Thread Bill Jay
You may try to use this one:

https://github.com/sbt/sbt-assembly

I had an issue of duplicate files in the uber jar file. But I think this
library will assemble dependencies into a single jar file.

Bill


On Fri, Jul 11, 2014 at 1:34 AM, Dilip dilip_ram...@hotmail.com wrote:

  A simple
 sbt assembly
 is not working. Is there any other way to include particular jars with
 assembly command?

 Regards,
 Dilip

 On Friday 11 July 2014 12:45 PM, Bill Jay wrote:

 I have met similar issues. The reason is probably because in Spark
 assembly, spark-streaming-kafka is not included. Currently, I am using
 Maven to generate a shaded package with all the dependencies. You may try
 to use sbt assembly to include the dependencies in your jar file.

  Bill


 On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote:

  Hi Akhil,

 Can you please guide me through this? Because the code I am running
 already has this in it:
 [java]

 SparkContext sc = new SparkContext();

 sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar);


 Is there something I am missing?

 Thanks,
 Dilip


 On Friday 11 July 2014 12:02 PM, Akhil Das wrote:

  Easiest fix would be adding the kafka jars to the SparkContext while
 creating it.

  Thanks
 Best Regards


 On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote:

 Hi,

 I am trying to run a program with spark streaming using Kafka on a stand
 alone system. These are my details:

 Spark 1.0.0 hadoop2
 Scala 2.10.3

 I am trying a simple program using my custom sbt project but this is the
 error I am getting:

 Exception in thread main java.lang.NoClassDefFoundError:
 kafka/serializer/StringDecoder
 at
 org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
 at
 org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
 at
 org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
 at SimpleJavaApp.main(SimpleJavaApp.java:40)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException:
 kafka.serializer.StringDecoder
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 11 more


 here is my .sbt file:

 name := Simple Project

 version := 1.0

 scalaVersion := 2.10.3

 libraryDependencies += org.apache.spark %% spark-core % 1.0.0

 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0

 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0

 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0

 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10
 % 1.0.0

 libraryDependencies += org.apache.kafka %% kafka % 0.8.0

 resolvers += Akka Repository at http://repo.akka.io/releases/;

 resolvers += Maven Repository at http://central.maven.org/maven2/;


 sbt package was successful. I also tried sbt ++2.10.3 package to build
 it for my scala version. Problem remains the same.
 Can anyone help me out here? Ive been stuck on this for quite some time
 now.

 Thank You,
 Dilip








Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

Below is my main function. I omit some filtering and data conversion
functions. These functions are just a one-to-one mapping, which may not
possible increase running time. The only reduce function I have here is
groupByKey. There are 4 topics in my Kafka brokers and two of the topics
have 240k lines each minute. And the other two topics have less than 30k
lines per minute. The batch size is one minute and I specified 300
executors in my spark-submit script. The default parallelism is 300.


val parition = 300
val zkQuorum = zk1,zk2,zk3
val group = my-group- + currentTime.toString
val topics = topic1,topic2,topic3,topic4
val numThreads = 4
val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
ssc = new StreamingContext(conf, Seconds(batch))
ssc.checkpoint(hadoopOutput + checkpoint)
val lines = lines1
lines.cache()
val jsonData = lines.map(JSON.parseFull(_))
val mapData = jsonData.filter(_.isDefined)

.map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
val validMapData = mapData.filter(isValidData(_))
val fields = validMapData.map(data = (data(id).toString,
timestampToUTCUnix(data(time).toString),

 timestampToUTCUnix(data(local_time).toString), data(id2).toString,
   data(id3).toString,
data(log_type).toString, data(sub_log_type).toString))
val timeDiff = 3600L
val filteredFields = fields.filter(field = abs(field._2 - field._3) =
timeDiff)

val watchTimeFields = filteredFields.map(fields = (fields._1,
fields._2, fields._4, fields._5, fields._7))
val watchTimeTuples = watchTimeFields.map(fields =
getWatchtimeTuple(fields))
val programDuids = watchTimeTuples.map(fields = (fields._3,
fields._1)).groupByKey(partition)
val programDuidNum = programDuids.map{case(key, value) = (key,
value.toSet.size)}
programDuidNum.saveAsTextFiles(hadoopOutput+result)

I have been working on this for several days. No findings why there are
always 2 executors for the groupBy stage. Thanks a lot!

Bill


On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I 
 checked
 the slow stage

Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi folks,

I just ran another job that only received data from Kafka, did some
filtering, and then save as text files in HDFS. There was no reducing work
involved. Surprisingly, the number of executors for the saveAsTextFiles
stage was also 2 although I specified 300 executors in the job submission.
As a result, the simple save file action took more than 2 minutes. Do you
have any idea how Spark determined the number of executors
for different stages?

Thanks!

Bill


On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), data(id2).toString,
data(id3).toString,
 data(log_type).toString, data(sub_log_type).toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field = abs(field._2 - field._3)
 = timeDiff)

 val watchTimeFields = filteredFields.map(fields = (fields._1,
 fields._2, fields._4, fields._5, fields._7))
 val watchTimeTuples = watchTimeFields.map(fields =
 getWatchtimeTuple(fields))
 val programDuids = watchTimeTuples.map(fields = (fields._3,
 fields._1)).groupByKey(partition)
 val programDuidNum = programDuids.map{case(key, value) = (key,
 value.toSet.size)}
 programDuidNum.saveAsTextFiles(hadoopOutput+result)

 I have been working on this for several days. No findings why there are
 always 2 executors for the groupBy stage. Thanks a lot!

 Bill


 On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I 
 further
 observed that most executors take less than 20 seconds but two of them 
 take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the
 DStream.ByKey operations? If the reduce by key is not set

Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

Do you mean that the data is not shuffled until the reduce stage? That
means groupBy still only uses 2 machines?

I think I used repartition(300) after I read the data from Kafka into
DStream. It seems that it did not guarantee that the map or reduce stages
will be run on 300 machines. I am currently trying to initiate 100 DStream
from KafkaUtils.createDStream and union them. Now the reduce stages had
around 80 machines for all the batches. However, this method will introduce
many dstreams. It will be good if we can control the number of executors in
the groupBy operation because the calculation needs to be finished within 1
minute for different size of input data based on our production need.

Thanks!


Bill


On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Aah, I get it now. That is because the input data streams is replicated on
 two machines, so by locality the data is processed on those two machines.
 So the map stage on the data uses 2 executors, but the reduce stage,
 (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default
 parallelism takes into affect only when the data is explicitly shuffled
 around.

 You can fix this by explicitly repartitioning the data.

 inputDStream.repartition(partitions)

 This is covered in the streaming tuning guide
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
 .

 TD



 On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), data(id2).toString,
data(id3).toString,
 data(log_type).toString, data(sub_log_type).toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field = abs(field._2 - field._3)
 = timeDiff)

 val watchTimeFields = filteredFields.map(fields = (fields._1,
 fields._2, fields._4, fields._5, fields._7))
 val watchTimeTuples = watchTimeFields.map(fields =
 getWatchtimeTuple(fields))
 val programDuids = watchTimeTuples.map(fields = (fields._3,
 fields._1)).groupByKey(partition)
 val programDuidNum = programDuids.map{case(key, value) = (key,
 value.toSet.size)}
 programDuidNum.saveAsTextFiles(hadoopOutput+result)

 I have been working on this for several days. No findings why there are
 always 2 executors for the groupBy stage. Thanks a lot!

 Bill


 On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which

Re: Use Spark Streaming to update result whenever data come

2014-07-10 Thread Bill Jay
Tobias,

Your help on the problems I have met have been very helpful. Thanks a lot!

Bill


On Wed, Jul 9, 2014 at 6:04 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 good to know you found your bottleneck. Unfortunately, I don't know how to
 solve this; until know, I have used Spark only with embarassingly parallel
 operations such as map or filter. I hope someone else might provide more
 insight here.

 Tobias


 On Thu, Jul 10, 2014 at 9:57 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tobias,

 Now I did the re-partition and ran the program again. I find a bottleneck
 of the whole program. In the streaming, there is a stage marked as 
 *combineByKey
 at ShuffledDStream.scala:42 *in spark UI. This stage is repeatedly
 executed. However, during some batches, the number of executors allocated
 to this step is only 2 although I used 300 workers and specified the
 partition number as 300. In this case, the program is very slow although
 the data that are processed are not big.

 Do you know how to solve this issue?

 Thanks!


 On Wed, Jul 9, 2014 at 5:51 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 I haven't worked with Yarn, but I would try adding a repartition() call
 after you receive your data from Kafka. I would be surprised if that didn't
 help.


 On Thu, Jul 10, 2014 at 6:23 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tobias,

 I was using Spark 0.9 before and the master I used was yarn-standalone.
 In Spark 1.0, the master will be either yarn-cluster or yarn-client. I am
 not sure whether it is the reason why more machines do not provide better
 scalability. What is the difference between these two modes in terms of
 efficiency? Thanks!


 On Tue, Jul 8, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 do the additional 100 nodes receive any tasks at all? (I don't know
 which cluster you use, but with Mesos you could check client logs in the
 web interface.) You might want to try something like repartition(N) or
 repartition(N*2) (with N the number of your nodes) after you receive your
 data.

 Tobias


 On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tobias,

 Thanks for the suggestion. I have tried to add more nodes from 300 to
 400. It seems the running time did not get improved.


 On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 can't you just add more nodes in order to speed up the processing?

 Tobias


 On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com
  wrote:

 Hi all,

 I have a problem of using Spark Streaming to accept input data and
 update a result.

 The input of the data is from Kafka and the output is to report a
 map which is updated by historical data in every minute. My current 
 method
 is to set batch size as 1 minute and use foreachRDD to update this map 
 and
 output the map at the end of the foreachRDD function. However, the 
 current
 issue is the processing cannot be finished within one minute.

 I am thinking of updating the map whenever the new data come
 instead of doing the update when the whoe RDD comes. Is there any idea 
 on
 how to achieve this in a better running time? Thanks!

 Bill











Re: Number of executors change during job running

2014-07-10 Thread Bill Jay
Hi Tathagata,

I set default parallelism as 300 in my configuration file. Sometimes there
are more executors in a job. However, it is still slow. And I further
observed that most executors take less than 20 seconds but two of them take
much longer such as 2 minutes. The data size is very small (less than 480k
lines with only 4 fields). I am not sure why the group by operation takes
more then 3 minutes.  Thanks!

Bill


On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from Kafka
 and group the data by a certain field. The data size is 480k lines per
 minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to finish
 the groupBy operation, which seems slow to me. I allocated 300 workers and
 specify 300 as the partition number for groupby. When I checked the slow
 stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes
 2 executors allocated for this stage. However, during other batches, the
 executors can be several hundred for the same stage, which means the number
 of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for different
 stages and how to increase the efficiency for task? Thanks!

 Bill





Re: Use Spark Streaming to update result whenever data come

2014-07-09 Thread Bill Jay
Hi Tobias,

I was using Spark 0.9 before and the master I used was yarn-standalone. In
Spark 1.0, the master will be either yarn-cluster or yarn-client. I am not
sure whether it is the reason why more machines do not provide better
scalability. What is the difference between these two modes in terms of
efficiency? Thanks!


On Tue, Jul 8, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 do the additional 100 nodes receive any tasks at all? (I don't know which
 cluster you use, but with Mesos you could check client logs in the web
 interface.) You might want to try something like repartition(N) or
 repartition(N*2) (with N the number of your nodes) after you receive your
 data.

 Tobias


 On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tobias,

 Thanks for the suggestion. I have tried to add more nodes from 300 to
 400. It seems the running time did not get improved.


 On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 can't you just add more nodes in order to speed up the processing?

 Tobias


 On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a problem of using Spark Streaming to accept input data and
 update a result.

 The input of the data is from Kafka and the output is to report a map
 which is updated by historical data in every minute. My current method is
 to set batch size as 1 minute and use foreachRDD to update this map and
 output the map at the end of the foreachRDD function. However, the current
 issue is the processing cannot be finished within one minute.

 I am thinking of updating the map whenever the new data come instead of
 doing the update when the whoe RDD comes. Is there any idea on how to
 achieve this in a better running time? Thanks!

 Bill







Number of executors change during job running

2014-07-09 Thread Bill Jay
Hi all,

I have a Spark streaming job running on yarn. It consume data from Kafka
and group the data by a certain field. The data size is 480k lines per
minute where the batch size is 1 minute.

For some batches, the program sometimes take more than 3 minute to finish
the groupBy operation, which seems slow to me. I allocated 300 workers and
specify 300 as the partition number for groupby. When I checked the slow
stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2
executors allocated for this stage. However, during other batches, the
executors can be several hundred for the same stage, which means the number
of executors for the same operations change.

Does anyone know how Spark allocate the number of executors for different
stages and how to increase the efficiency for task? Thanks!

Bill


Re: Use Spark Streaming to update result whenever data come

2014-07-09 Thread Bill Jay
Hi Tobias,

Now I did the re-partition and ran the program again. I find a bottleneck
of the whole program. In the streaming, there is a stage marked as
*combineByKey
at ShuffledDStream.scala:42 *in spark UI. This stage is repeatedly
executed. However, during some batches, the number of executors allocated
to this step is only 2 although I used 300 workers and specified the
partition number as 300. In this case, the program is very slow although
the data that are processed are not big.

Do you know how to solve this issue?

Thanks!


On Wed, Jul 9, 2014 at 5:51 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 I haven't worked with Yarn, but I would try adding a repartition() call
 after you receive your data from Kafka. I would be surprised if that didn't
 help.


 On Thu, Jul 10, 2014 at 6:23 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tobias,

 I was using Spark 0.9 before and the master I used was yarn-standalone.
 In Spark 1.0, the master will be either yarn-cluster or yarn-client. I am
 not sure whether it is the reason why more machines do not provide better
 scalability. What is the difference between these two modes in terms of
 efficiency? Thanks!


 On Tue, Jul 8, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 do the additional 100 nodes receive any tasks at all? (I don't know
 which cluster you use, but with Mesos you could check client logs in the
 web interface.) You might want to try something like repartition(N) or
 repartition(N*2) (with N the number of your nodes) after you receive your
 data.

 Tobias


 On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tobias,

 Thanks for the suggestion. I have tried to add more nodes from 300 to
 400. It seems the running time did not get improved.


 On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 can't you just add more nodes in order to speed up the processing?

 Tobias


 On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a problem of using Spark Streaming to accept input data and
 update a result.

 The input of the data is from Kafka and the output is to report a map
 which is updated by historical data in every minute. My current method is
 to set batch size as 1 minute and use foreachRDD to update this map and
 output the map at the end of the foreachRDD function. However, the 
 current
 issue is the processing cannot be finished within one minute.

 I am thinking of updating the map whenever the new data come instead
 of doing the update when the whoe RDD comes. Is there any idea on how to
 achieve this in a better running time? Thanks!

 Bill









Join two Spark Streaming

2014-07-08 Thread Bill Jay
Hi all,

I am working on a pipeline that needs to join two Spark streams. The input
is a stream of integers. And the output is the number of integer's
appearance divided by the total number of unique integers. Suppose the
input is:

1
2
3
1
2
2

There are 3 unique integers and 1 appears twice. Therefore, the output for
the integer 1 will be:
1 0.67

Since the input is from a stream, it seems we need to first join the
appearance of the integers and the total number of unique integers and then
do a calculation using map. I am thinking of adding a dummy key to both
streams and use join. However, a Cartesian product matches the application
here better. How to do this effectively? Thanks!

Bill


Re: Use Spark Streaming to update result whenever data come

2014-07-08 Thread Bill Jay
Hi Tobias,

Thanks for the suggestion. I have tried to add more nodes from 300 to 400.
It seems the running time did not get improved.


On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 can't you just add more nodes in order to speed up the processing?

 Tobias


 On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a problem of using Spark Streaming to accept input data and update
 a result.

 The input of the data is from Kafka and the output is to report a map
 which is updated by historical data in every minute. My current method is
 to set batch size as 1 minute and use foreachRDD to update this map and
 output the map at the end of the foreachRDD function. However, the current
 issue is the processing cannot be finished within one minute.

 I am thinking of updating the map whenever the new data come instead of
 doing the update when the whoe RDD comes. Is there any idea on how to
 achieve this in a better running time? Thanks!

 Bill





Spark-streaming-kafka error

2014-07-08 Thread Bill Jay
Hi all,

I used sbt to package a code that uses spark-streaming-kafka. The packaging
succeeded. However, when I submitted to yarn, the job ran for 10 seconds
and there was an error in the log file as follows:

Caused by: java.lang.NoClassDefFoundError:
org/apache/spark/streaming/kafka/KafkaUtils$


Does anyone know the reason for this? Thanks!


Bill


Re: Spark-streaming-kafka error

2014-07-08 Thread Bill Jay
Hi Tobias,

Currently, I do not use bundle any dependency into my application jar. I
will try that. Thanks a lot!

Bill


On Tue, Jul 8, 2014 at 5:22 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 have you packaged org.apache.spark % spark-streaming-kafka_2.10 %
 1.0.0 into your application jar? If I remember correctly, it's not
 bundled with the downloadable compiled version of Spark.

 Tobias


 On Wed, Jul 9, 2014 at 8:18 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I used sbt to package a code that uses spark-streaming-kafka. The
 packaging succeeded. However, when I submitted to yarn, the job ran for 10
 seconds and there was an error in the log file as follows:

 Caused by: java.lang.NoClassDefFoundError:
 org/apache/spark/streaming/kafka/KafkaUtils$


 Does anyone know the reason for this? Thanks!


 Bill





Use Spark Streaming to update result whenever data come

2014-07-02 Thread Bill Jay
Hi all,

I have a problem of using Spark Streaming to accept input data and update a
result.

The input of the data is from Kafka and the output is to report a map which
is updated by historical data in every minute. My current method is to set
batch size as 1 minute and use foreachRDD to update this map and output the
map at the end of the foreachRDD function. However, the current issue is
the processing cannot be finished within one minute.

I am thinking of updating the map whenever the new data come instead of
doing the update when the whoe RDD comes. Is there any idea on how to
achieve this in a better running time? Thanks!

Bill


Re: Could not compute split, block not found

2014-07-01 Thread Bill Jay
Hi Tobias,

Your explanation makes a lot of sense. Actually, I tried to use partial
data on the same program yesterday. It has been up for around 24 hours and
is still running correctly. Thanks!

Bill


On Mon, Jun 30, 2014 at 5:53 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 let's say the processing time is t' and the window size t. Spark does not
 *require* t'  t. In fact, for *temporary* peaks in your streaming data, I
 think the way Spark handles it is very nice, in particular since 1) it does
 not mix up the order in which items arrived in the stream, so items from a
 later window will always be processed later, and 2) because an increase in
 data will not be punished with high load and unresponsive systems, but with
 disk space consumption instead.

 However, if all of your windows require t'  t processing time (and it's
 not because you are waiting, but because you actually do some computation),
 then you are in bad luck, because if you start processing the next window
 while the previous one is still processed, you have less resources for each
 and processing will take even longer. However, if you are only waiting
 (e.g., for network I/O), then maybe you can employ some asynchronous
 solution where your tasks return immediately and deliver their result via a
 callback later?

 Tobias



 On Tue, Jul 1, 2014 at 2:26 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Tobias,

 Your suggestion is very helpful. I will definitely investigate it.

 Just curious. Suppose the batch size is t seconds. In practice, does
 Spark always require the program to finish processing the data of t seconds
 within t seconds' processing time? Can Spark begin to consume the new batch
 before finishing processing the next batch? If Spark can do them together,
 it may save the processing time and solve the problem of data piling up.

 Thanks!

 Bill




 On Mon, Jun 30, 2014 at 4:49 AM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 ​​If your batch size is one minute and it takes more than one minute to
 process, then I guess that's what causes your problem. The processing of
 the second batch will not start after the processing of the first is
 finished, which leads to more and more data being stored and waiting for
 processing; check the attached graph for a visualization of what I think
 may happen.

 Can you maybe do something hacky like throwing away a part of the data
 so that processing time gets below one minute, then check whether you still
 get that error?

 Tobias


 ​​


 On Mon, Jun 30, 2014 at 1:56 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Tobias,

 Thanks for your help. I think in my case, the batch size is 1 minute.
 However, it takes my program more than 1 minute to process 1 minute's
 data. I am not sure whether it is because the unprocessed data pile
 up. Do you have an suggestion on how to check it and solve it? Thanks!

 Bill


 On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 were you able to process all information in time, or did maybe some
 unprocessed data pile up? I think when I saw this once, the reason
 seemed to be that I had received more data than would fit in memory,
 while waiting for processing, so old data was deleted. When it was
 time to process that data, it didn't exist any more. Is that a
 possible reason in your case?

 Tobias

 On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:
  Hi,
 
  I am running a spark streaming job with 1 minute as the batch size.
 It ran
  around 84 minutes and was killed because of the exception with the
 following
  information:
 
  java.lang.Exception: Could not compute split, block
 input-0-1403893740400
  not found
 
 
  Before it was killed, it was able to correctly generate output for
 each
  batch.
 
  Any help on this will be greatly appreciated.
 
  Bill
 








Re: Could not compute split, block not found

2014-07-01 Thread Bill Jay
Hi Tathagata,

Yes. The input stream is from Kafka and my program reads the data, keeps
all the data in memory, process the data, and generate the output.

Bill


On Mon, Jun 30, 2014 at 11:45 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Are you by any change using only memory in the storage level of the input
 streams?

 TD


 On Mon, Jun 30, 2014 at 5:53 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 let's say the processing time is t' and the window size t. Spark does not
 *require* t'  t. In fact, for *temporary* peaks in your streaming data, I
 think the way Spark handles it is very nice, in particular since 1) it does
 not mix up the order in which items arrived in the stream, so items from a
 later window will always be processed later, and 2) because an increase in
 data will not be punished with high load and unresponsive systems, but with
 disk space consumption instead.

 However, if all of your windows require t'  t processing time (and it's
 not because you are waiting, but because you actually do some computation),
 then you are in bad luck, because if you start processing the next window
 while the previous one is still processed, you have less resources for each
 and processing will take even longer. However, if you are only waiting
 (e.g., for network I/O), then maybe you can employ some asynchronous
 solution where your tasks return immediately and deliver their result via a
 callback later?

 Tobias



 On Tue, Jul 1, 2014 at 2:26 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Tobias,

 Your suggestion is very helpful. I will definitely investigate it.

 Just curious. Suppose the batch size is t seconds. In practice, does
 Spark always require the program to finish processing the data of t seconds
 within t seconds' processing time? Can Spark begin to consume the new batch
 before finishing processing the next batch? If Spark can do them together,
 it may save the processing time and solve the problem of data piling up.

 Thanks!

 Bill




 On Mon, Jun 30, 2014 at 4:49 AM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 ​​If your batch size is one minute and it takes more than one minute to
 process, then I guess that's what causes your problem. The processing of
 the second batch will not start after the processing of the first is
 finished, which leads to more and more data being stored and waiting for
 processing; check the attached graph for a visualization of what I think
 may happen.

 Can you maybe do something hacky like throwing away a part of the data
 so that processing time gets below one minute, then check whether you still
 get that error?

 Tobias


 ​​


 On Mon, Jun 30, 2014 at 1:56 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Tobias,

 Thanks for your help. I think in my case, the batch size is 1 minute.
 However, it takes my program more than 1 minute to process 1 minute's
 data. I am not sure whether it is because the unprocessed data pile
 up. Do you have an suggestion on how to check it and solve it? Thanks!

 Bill


 On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 were you able to process all information in time, or did maybe some
 unprocessed data pile up? I think when I saw this once, the reason
 seemed to be that I had received more data than would fit in memory,
 while waiting for processing, so old data was deleted. When it was
 time to process that data, it didn't exist any more. Is that a
 possible reason in your case?

 Tobias

 On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:
  Hi,
 
  I am running a spark streaming job with 1 minute as the batch size.
 It ran
  around 84 minutes and was killed because of the exception with the
 following
  information:
 
  java.lang.Exception: Could not compute split, block
 input-0-1403893740400
  not found
 
 
  Before it was killed, it was able to correctly generate output for
 each
  batch.
 
  Any help on this will be greatly appreciated.
 
  Bill
 









slf4j multiple bindings

2014-07-01 Thread Bill Jay
Hi all,

I have an issue with multiple slf4j bindings. My program was running
correctly. I just added the new dependency kryo. And when I submitted a
job, the job was killed because of the following error messages:

*SLF4J: Class path contains multiple SLF4J bindings.*


The log said there were three slf4j bindings:
spark-assembly-0.9.1-hadoop2.3.0.jar, hadoop lib, and my own jar file.

However, I did not explicitly add slf4j in my pom.xml file. I added
exclusions in the dependency of kryo but it did not work. Does anyone has
an idea how to fix this issue? Thanks!

Regards,

Bill


Re: Could not compute split, block not found

2014-06-30 Thread Bill Jay
Tobias,

Your suggestion is very helpful. I will definitely investigate it.

Just curious. Suppose the batch size is t seconds. In practice, does Spark
always require the program to finish processing the data of t seconds
within t seconds' processing time? Can Spark begin to consume the new batch
before finishing processing the next batch? If Spark can do them together,
it may save the processing time and solve the problem of data piling up.

Thanks!

Bill




On Mon, Jun 30, 2014 at 4:49 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 ​​If your batch size is one minute and it takes more than one minute to
 process, then I guess that's what causes your problem. The processing of
 the second batch will not start after the processing of the first is
 finished, which leads to more and more data being stored and waiting for
 processing; check the attached graph for a visualization of what I think
 may happen.

 Can you maybe do something hacky like throwing away a part of the data so
 that processing time gets below one minute, then check whether you still
 get that error?

 Tobias


 ​​


 On Mon, Jun 30, 2014 at 1:56 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Tobias,

 Thanks for your help. I think in my case, the batch size is 1 minute.
 However, it takes my program more than 1 minute to process 1 minute's
 data. I am not sure whether it is because the unprocessed data pile up.
 Do you have an suggestion on how to check it and solve it? Thanks!

 Bill


 On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 were you able to process all information in time, or did maybe some
 unprocessed data pile up? I think when I saw this once, the reason
 seemed to be that I had received more data than would fit in memory,
 while waiting for processing, so old data was deleted. When it was
 time to process that data, it didn't exist any more. Is that a
 possible reason in your case?

 Tobias

 On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:
  Hi,
 
  I am running a spark streaming job with 1 minute as the batch size. It
 ran
  around 84 minutes and was killed because of the exception with the
 following
  information:
 
  java.lang.Exception: Could not compute split, block
 input-0-1403893740400
  not found
 
 
  Before it was killed, it was able to correctly generate output for each
  batch.
 
  Any help on this will be greatly appreciated.
 
  Bill
 






Re: Could not compute split, block not found

2014-06-29 Thread Bill Jay
Tobias,

Thanks for your help. I think in my case, the batch size is 1 minute.
However, it takes my program more than 1 minute to process 1 minute's data.
I am not sure whether it is because the unprocessed data pile up. Do you
have an suggestion on how to check it and solve it? Thanks!

Bill


On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 were you able to process all information in time, or did maybe some
 unprocessed data pile up? I think when I saw this once, the reason
 seemed to be that I had received more data than would fit in memory,
 while waiting for processing, so old data was deleted. When it was
 time to process that data, it didn't exist any more. Is that a
 possible reason in your case?

 Tobias

 On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:
  Hi,
 
  I am running a spark streaming job with 1 minute as the batch size. It
 ran
  around 84 minutes and was killed because of the exception with the
 following
  information:
 
  java.lang.Exception: Could not compute split, block input-0-1403893740400
  not found
 
 
  Before it was killed, it was able to correctly generate output for each
  batch.
 
  Any help on this will be greatly appreciated.
 
  Bill
 



Could not compute split, block not found

2014-06-27 Thread Bill Jay
Hi,

I am running a spark streaming job with 1 minute as the batch size. It ran
around 84 minutes and was killed because of the exception with the
following information:

*java.lang.Exception: Could not compute split, block input-0-1403893740400
not found*


Before it was killed, it was able to correctly generate output for each
batch.

Any help on this will be greatly appreciated.

Bill


Re: Spark Streaming RDD transformation

2014-06-26 Thread Bill Jay
Thanks, Sean!

I am currently using foreachRDD to update the global map using data in each
RDD. The reason I want to return a map as RDD instead of just updating the
map is that RDD provides many handy methods for output. For example, I want
to save the global map into files in HDFS for each batch in the stream. In
this case, do you have any suggestion how Spark can easily allow me to do
that? Thanks!


On Thu, Jun 26, 2014 at 12:26 PM, Sean Owen so...@cloudera.com wrote:

 If you want to transform an RDD to a Map, I assume you have an RDD of
 pairs. The method collectAsMap() creates a Map from the RDD in this
 case.

 Do you mean that you want to update a Map object using data in each
 RDD? You would use foreachRDD() in that case. Then you can use
 RDD.foreach to do something like update a global Map object.

 Not sure if this is what you mean but SparkContext.parallelize() can
 be used to make an RDD from a List or Array of objects. But that's not
 really related to streaming or updating a Map.

 On Thu, Jun 26, 2014 at 1:40 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:
  Hi all,
 
  I am current working on a project that requires to transform each RDD in
 a
  DStream to a Map. Basically, when we get a list of data in each batch, we
  would like to update the global map. I would like to return the map as a
  single RDD.
 
  I am currently trying to use the function transform. The output will be a
  RDD of the updated map after each batch. How can I create an RDD from
  another data structure such as Int, Map, ect. Thanks!
 
  Bill