Hi Vinti,

All of your tasks are failing based on the screen shots provided.

I think a few more details would be helpful.  Is this YARN or a Standalone
cluster?  How much overall memory is on your cluster?  On each machine
where workers and executors are running?  Are you using the Direct
(KafkaUtils.createDirectStream) or Receiver (KafkaUtils.createStream)?

You may find this discussion of value on SO:
http://stackoverflow.com/questions/28901123/org-apache-spark-shuffle-metadatafetchfailedexception-missing-an-output-locatio

-Todd

On Mon, Mar 7, 2016 at 5:52 PM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Hi,
>
> My spark-streaming program seems very slow. I am using Ambari for cluster
> setup and i am using Kafka for data input.
> I tried to use batch size 2 secs and check pointing duration 10 secs. But
> as i was seeing scheduling delay was keep increasing so i tried increasing
> the batch size to 5 and then 10 secs. But it seems noting changed in
> respect of performance.
>
> *My program is doing two tasks:*
>
> 1) Data aggregation
>
> 2) Data insertion into Hbase
>
> Action which took maximum time, when i called foreachRDD on Dstream object
> (state).
>
> *state.foreachRDD(rdd => rdd.foreach(Blaher.blah))*
>
>
>
>
> *Program sample input coming from kafka:*
> test_id, file1, 1,1,1,1,1
>
> *Code snippets:*
>
> val parsedStream = inputStream
>   .map(line => {
>     val splitLines = line.split(",")
>     (splitLines(1), splitLines.slice(2, 
> splitLines.length).map((_.trim.toLong)))
>   })
> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>         (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>           prev.map(_ +: current).orElse(Some(current))
>             .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
> })
> *state.foreachRDD(rdd => rdd.foreach(Blaher.blah))*
>
>
>
> object Blaher {
>   def blah(tup: (String, Array[Long])) {
>     val hConf = HBaseConfiguration.create()
>     ------
>     val hTable = new HTable(hConf, tableName)
>     val thePut = new Put(Bytes.toBytes("file_data"))
>     thePut.add(Bytes.toBytes("file_counts"), Bytes.toBytes(tup._1), 
> Bytes.toBytes(tup._2.toList.toString))
>     new ImmutableBytesWritable(Bytes.toBytes("file_data"))
>
>     hTable.put(thePut)
>   }
> }
>
>
> *My Cluster Specifications:*
> 16 executors ( 1 core each and 2g memory)
>
> I have attached some screenshots of running execution.
>
> Anyone has idea what changes should i do to speedup the processing?
>
> Thanks & Regards,
>
> Vinti
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

Reply via email to