GitHub user uncleGen opened a pull request:
https://github.com/apache/spark/pull/2562
[SPARK-3712][STREAMING]: add a new UpdateDStream to update a rdd dynamically
Maybe, we can achieve the aim by using "forEachRdd" function. But it is
weird in this way, because I need to pass a closure, like this:
val baseRdd = ...
var updatedRDD = ...
val inputStream = ...
val func = (rdd: RDD[T], t: Time) => {
updatedRDD = baseRDD.op(rdd)
}
inputStream.foreachRDD(func _)
In my PR, we can update a rdd like:
val updateStream = inputStream.updateRDD(baseRDD, func).asInstanceOf[U,
V, T]
and obtain the updatedRDD like this:
val updatedRDD = updateStream.getUpdatedRDD
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/uncleGen/spark master-clean-14928
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/2562.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2562
----
commit 265c941fe1b7cd164eef11c58f622a0c434a229b
Author: uncleGen <[email protected]>
Date: 2014-09-28T07:48:20Z
[STREAMING]: add a new UpdateDStream to update a rdd dynamically
commit b5cdb62410c3461115e76a9549f160460b63b8fb
Author: uncleGen <[email protected]>
Date: 2014-09-28T10:37:40Z
fix test
commit 41d9a952d39f8bc64a38312856ab57e304a59382
Author: uncleGen <[email protected]>
Date: 2014-09-28T10:40:37Z
clerical error
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]