Hi All,

I am new to both Scala & Spark, so please expect some mistakes.

Setup : 

Scala : 2.10.2 
Spark : Apache 1.1.0
Hadoop : Apache 2.4

Intend of the code : To read from kafka topic & do some processing.

Below are the code details and error am getting. : 



import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.SparkContext._
import scala.collection.IndexedSeq._
import org.apache.spark.streaming.dstream
import java.io.File
import java.util.Properties

import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by samyamaiti on 12/25/14.
 */
object Driver {

  def main(args: Array[String]) {

    //CheckPoint dir in HDFS
    val checkpointDirectory =
"hdfs://localhost:8020/user/samyamaiti/SparkCheckpoint1"


    //functionToCreateContext
    def functionToCreateContext(): StreamingContext = {
      //Setting conf object
      val conf = new SparkConf()
      conf.setMaster("spark://SamyaMac.local:7077")
      conf.setAppName("SparkStreamingFileProcessor")

      val ssc = new StreamingContext(conf, Seconds(1))

      //Create Check pointing
      ssc.checkpoint(checkpointDirectory)
      ssc
    }

    // Get StreamingContext from checkpoint data or create a new one
    val sscContext = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)

    //Accumulator to keep track of number of messages
    val numInputMessages = sscContext.sparkContext.accumulator(0L, "Kafka
messages consumed")

    //Number of consumer threads Input DStream
    val consumerThreadsPerInputDStream = 1

    //Setting the topic
    val topics = Map("testTopic" -> consumerThreadsPerInputDStream)

    //Zookeeper Qurom address
    val zkQurom = "http://localhost:2181";

    //Setting up the DStream
    val kafkaDStreams = {

      val numPartitionsOfInputTopic = 1
      val streams = (1 to numPartitionsOfInputTopic) map { _ =>
        KafkaUtils.createStream(sscContext, zkQurom, kafkaParams,
topics).map(_._2)
      }

      val unifiedStream = sscContext.union(streams)
      val sparkProcessingParallelism = 1
      unifiedStream.repartition(sparkProcessingParallelism)
    }

    //Setting the stream processing pipeline
    //Printing the file name in HDFS as received from Kafka & saving the
same to HDFS
    kafkaDStreams.map {
      case bytes => numInputMessages += 1
    }.foreachRDD(rdd => {
      println("2")
    })

    // Run the streaming job
    sscContext.start()
    sscContext.awaitTermination()

  }
}


Build.sbt
---------

name := "SparkFileProcessor"

version := "1.0"

scalaVersion := "2.10.2"


libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.1.0",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.0",
  "org.apache.hadoop" % "hadoop-client" % "2.4.0"
)



Error
-----

14/12/25 23:55:06 INFO MemoryStore: MemoryStore started with capacity 265.4
MB
14/12/25 23:55:06 INFO NettyBlockTransferService: Server created on 56078
14/12/25 23:55:06 INFO BlockManagerMaster: Trying to register BlockManager
14/12/25 23:55:06 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkDriver@***.***.***.***:56065] has failed, address is
now gated for [5000] ms. Reason is: [Disassociated].
14/12/25 23:55:36 WARN AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
        at
scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
        at
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
        
        
Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ReliableDeliverySupervisor-Association-with-remote-system-tp20859.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to