Are there any examples of how to use StateStore with DStreams?  It seems
like the idea would be to create a new version with each minibatch, but I
don't quite know how to make that happen.  My lame attempt is below.

  def run (ss: SparkSession): Unit = {
    val c = new StreamingContext(ss.sparkContext, Seconds(2))
    val stm = c.socketTextStream("localhost", 9999)

    var version = 0L
    stm.foreachRDD { (rdd, t) =>
      val data = rdd
      .map { (s) =>
        val Array(k, v) = s.split(" ")
        (k, v)
      }
      .mapPartitionsWithStateStore(ss.sqlContext, "/Users/msmith/cp", 1,
version, keySchema, valueSchema) { (store, rows) =>
        val data = rows.map { case (k,v) =>
          val keyRow = InternalRow(UTF8String.fromString(k))
          val keyURow = UnsafeProjection.create(keySchema).apply(keyRow)

          val newCount = v.toLong
          val count = store.get(keyURow).map(_.getLong(0)).getOrElse(0L)
          val valRow = InternalRow(count + newCount)
          val valURow = UnsafeProjection.create(valueSchema).apply(valRow)
          store.put(keyURow, valURow)

          val ret = (k, count + newCount)
          println("ret", ret)
          ret
        }
        lazy val finish = Some(("",0)).flatMap{ case(k,v) =>
            println("finish")
            version = store.commit()
            println("commit", version)
            None
        }

        data ++ finish
      }

      println(data.collectAsMap())

    }

    c.start()             // Start the computation
    c.awaitTermination()  // Wait for the computation to terminate

  }

Reply via email to