re-post to the right group.

---------- Forwarded message ----------
From: Daniel Li <daniell...@gmail.com>
Date: Tue, Oct 13, 2015 at 5:14 PM
Subject: [Streaming] join events in last 10 minutes
To: d...@spark.apache.org


We have a scenario that events from three kafka topics sharing the same
keys need to be merged. One topic has the master events; most events in
other two topics arrive within 10 minutes of master event arrival. Wrote
pseudo code below. I'd love to hear your thoughts whether I am on the right
track.

    // Scenario
    //     (1) Merging events from Kafka topic1, topic2 and topic 3 sharing
the same keys
    //     (2) Events in topic1 are master events
    //     (3) One master event may have associated event in Topic2 and/or
Topic3 sharing the same key
    //     (4) Most events in topic2 and topic3 will arrive within 10
minutes of the master event arrival
    //
    // Pseudo code
    //     Use 1-minute window of events in topic1, to left-outer-join with
next 10-minute of events from
    //     topic2 and topic3


    // parse the event to form key-value pair
    def parse(v:String) = {
        (v.split(",")(0), v)
    }

    // Create context with 1 minute batch interval
    val sparkConf = new SparkConf().setAppName("MergeLogs")
    val ssc = new StreamingContext(sparkConf, Minutes(1))
    ssc.checkpoint(checkpointDirectory)

    // Create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

    val stream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, Set(“topic1”)
    stream1.checkpoint(Minutes(5)
    val pairStream1 = stream1.map(_._2).map(s => parse(s))

    val stream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, Set(“topic2”)
    stream2.checkpoint(Minutes(5)
    val pairStream2 = stream2.map(_._2).map(s => parse(s))

    val stream3 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, Set(“topic3”)
    stream3.checkpoint(Minutes(5)
    val pairStream3 = stream3.map(_._2).map(s => parse(s))

    // load 1 minute of master events from topic 1
    val windowedStream1 = pairStream1.window(Minutes(1))

    // load 10 minutes of topic1 and topic2
    val windowedStream2 = pairStream2.window(Minutes(10), Minutes(1))
    val windowedStream3 = pairStream3.window(Minutes(10), Minutes(1))

    // lefter join topic1 with topic2 and topic3
    *val joinedStream =
windowedStream1.leftOuterJoin(windowedStream2).leftOuterJoin(windowedStream3)*

    // dump merged events
    joinedStream.foreachRDD { rdd =>
        val connection = createNewConnection()  // executed at the driver
        rdd.foreach { record =>
            connection.send(record) // executed at the worker
    }

    // Start the computation
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => {
        createContext(ip, port, outputPath, checkpointDirectory)
      })
    ssc.start()
    ssc.awaitTermination()

thx
Daniel

Reply via email to