Thanks, i'll look into that.
as for kafka, i've just used the simplest configuration, you can create it
using their quickstart and code like this for consumer
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181", "group.id" ->
"test-consumer-group1",
"zookeeper.connection.timeout.ms" -> "10000", "auto.offset.reset" ->
"smallest")
val textStream = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, Map("test" -> 1),
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
and like this for producer
val Array(brokers, topic, messagesPerSec, wordsPerMessage) =
Array("localhost:9092", "test", "10", "10")
// Zookeper connection properties
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
// Send some messages
while(true) {
producer.send(new KeyedMessage(topic, getNextClickEvent()))
Thread.sleep(10)
}
(i took that from
org.apache.spark.streaming.examples.clickstream.PageViewGenerator
org.apache.spark.streaming.examples.clickstream.PageViewStream and
org.apache.spark.streaming.examples.KafkaWordCount)
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/PageView-streaming-sample-lost-page-views-tp1126p1149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.