Thanks, i'll look into that.
as for kafka, i've just used the simplest configuration, you can create it
using their quickstart and code like this for consumer
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> "localhost:2181", "group.id" ->
"test-consumer-group1",
      "zookeeper.connection.timeout.ms" -> "10000", "auto.offset.reset" ->
"smallest")

    val textStream = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, Map("test" -> 1),
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)

and like this for producer

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) =
Array("localhost:9092", "test", "10", "10")

    // Zookeper connection properties
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")

    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config)

    // Send some messages
    while(true) {
      producer.send(new KeyedMessage(topic, getNextClickEvent()))
      Thread.sleep(10)
    }

(i took that from
org.apache.spark.streaming.examples.clickstream.PageViewGenerator
org.apache.spark.streaming.examples.clickstream.PageViewStream and
org.apache.spark.streaming.examples.KafkaWordCount)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PageView-streaming-sample-lost-page-views-tp1126p1149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to