Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/6034#discussion_r30388035
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---
@@ -302,35 +347,22 @@ abstract class DStream[T: ClassTag] (
// Compute the RDD if time is valid (e.g. correct time in a sliding
window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
- // Set the thread-local property for call sites to this DStream's
creation site
- // such that RDDs generated by compute gets that as their creation
site.
- // Note that this `getOrCompute` may get called from another
DStream which may have
- // set its own call site. So we store its call site in a temporary
variable,
- // set this DStream's creation site, generate RDDs and then
restore the previous call site.
- val prevCallSite = ssc.sparkContext.getCallSite()
- ssc.sparkContext.setCallSite(creationSite)
- // Disable checks for existing output directories in jobs launched
by the streaming
- // scheduler, since we may need to write output to an existing
directory during checkpoint
- // recovery; see SPARK-4835 for more details. We need to have this
call here because
- // compute() might cause Spark jobs to be launched.
- val rddOption =
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
- compute(time)
- }
- ssc.sparkContext.setCallSite(prevCallSite)
+ val newRDD = doCompute(time)
--- End diff --
This should have `option` in the name as before.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]