Subject: [Streaming] join events in last 10 minutes

We have a scenario that events from three kafka topics sharing the same
keys need to be merged. One topic has the master events; most events in
other two topics arrive within 10 minutes of master event arrival. Wrote
pseudo code below. I'd love to hear your thoughts whether I am on the right

    // Scenario
    //     (1) Merging events from Kafka topic1, topic2 and topic 3 sharing
the same keys
    //     (2) Events in topic1 are master events
    //     (3) One master event may have associated event in Topic2 and/or
Topic3 sharing the same key
    //     (4) Most events in topic2 and topic3 will arrive within 10
minutes of the master event arrival
    // Pseudo code
    //     Use 1-minute window of events in topic1, to left-outer-join with
next 10-minute of events from
    //     topic2 and topic3

    // parse the event to form key-value pair
    def parse(v:String) = {
        (v.split(",")(0), v)

    // Create context with 1 minute batch interval
    val sparkConf = new SparkConf().setAppName("MergeLogs")
    val ssc = new StreamingContext(sparkConf, Minutes(1))

    // Create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("" -> brokers)

    val stream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, Set(“topic1”)
    val pairStream1 = => parse(s))

    val stream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, Set(“topic2”)
    val pairStream2 = => parse(s))

    val stream3 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, Set(“topic3”)
    val pairStream3 = => parse(s))

    // load 1 minute of master events from topic 1
    val windowedStream1 = pairStream1.window(Minutes(1))

    // load 10 minutes of topic1 and topic2
    val windowedStream2 = pairStream2.window(Minutes(10), Minutes(1))
    val windowedStream3 = pairStream3.window(Minutes(10), Minutes(1))

    // lefter join topic1 with topic2 and topic3
    *val joinedStream =

    // dump merged events
    joinedStream.foreachRDD { rdd =>
        val connection = createNewConnection()  // executed at the driver
        rdd.foreach { record =>
            connection.send(record) // executed at the worker

    // Start the computation
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => {
        createContext(ip, port, outputPath, checkpointDirectory)


