Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/247#discussion_r11367954
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
---
@@ -215,4 +271,29 @@ class StreamingContextSuite extends FunSuite with
BeforeAndAfter with Timeouts {
}
}
-class TestException(msg: String) extends Exception(msg)
\ No newline at end of file
+class TestException(msg: String) extends Exception(msg)
+
+/** Custom receiver for testing whether all data received by a receiver
gets processed or not */
+class TestReceiver extends NetworkReceiver[Int] {
+ protected lazy val blockGenerator = new
BlockGenerator(StorageLevel.MEMORY_ONLY)
+ protected def onStart() {
+ blockGenerator.start()
+ logInfo("BlockGenerator started on thread " + receivingThread)
+ try {
+ while(true) {
+ blockGenerator += TestReceiver.counter.getAndIncrement
+ Thread.sleep(0)
+ }
+ } finally {
+ logInfo("Receiving stopped at count value of " +
TestReceiver.counter.get())
+ }
+ }
+
+ protected def onStop() {
+ blockGenerator.stop()
+ }
+}
+
+object TestReceiver {
+ val counter = new AtomicInteger(1)
+}
--- End diff --
nit: need new line here
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---