Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5918#discussion_r29828894
  
    --- Diff: 
external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
 ---
    @@ -167,26 +167,24 @@ class FlumePollingStreamSuite extends FunSuite with 
BeforeAndAfter with Logging
         }
       }
     
    -  def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
    +  def writeAndVerify(sinks: Seq[SparkSink], channels: Seq[MemoryChannel], 
ssc: StreamingContext,
         outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) {
         val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
         val executor = Executors.newCachedThreadPool()
         val executorCompletion = new ExecutorCompletionService[Void](executor)
    -    channels.map(channel => {
    +
    +    val latch = new CountDownLatch(batchCount * channels.size)
    +    sinks.foreach(_.countdownWhenBatchReceived(latch))
    +
    +    channels.foreach(channel => {
           executorCompletion.submit(new TxnSubmitter(channel, clock))
         })
    +
         for (i <- 0 until channels.size) {
           executorCompletion.take()
         }
    -    val startTime = System.currentTimeMillis()
    -    while (outputBuffer.size < batchCount * channels.size &&
    -      System.currentTimeMillis() - startTime < 15000) {
    -      logInfo("output.size = " + outputBuffer.size)
    -      Thread.sleep(100)
    -    }
    -    val timeTaken = System.currentTimeMillis() - startTime
    -    assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " 
ms")
    -    logInfo("Stopping context")
    +
    +    latch.await(15, TimeUnit.SECONDS)
    --- End diff --
    
    But how is it different from waiting for all the records to be received
    without waiting for the confirmation that data is actually sent from the
    sink or not? The likelihood of the data being sent within the timelimit is
    same in both cases, and the potential time that the code needs to wait is
    not too different either.
    
    Rather my questions is where is the underlying uncertainty of the data
    being sent or not sent coming from? Where is the non-determinism between
    the sink, the flume channel, and the reciver? The uncertainty must in the
    Flume components because we know through other tests that if the receiver
    receives it, the data will get collected through the DStreams very reliably.
    
    On Wed, May 6, 2015 at 8:59 PM, Hari Shreedharan <[email protected]>
    wrote:
    
    > In
    > 
external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
    > <https://github.com/apache/spark/pull/5918#discussion_r29823039>:
    >
    > >      for (i <- 0 until channels.size) {
    > >        executorCompletion.take()
    > >      }
    > > -    val startTime = System.currentTimeMillis()
    > > -    while (outputBuffer.size < batchCount * channels.size &&
    > > -      System.currentTimeMillis() - startTime < 15000) {
    > > -      logInfo("output.size = " + outputBuffer.size)
    > > -      Thread.sleep(100)
    > > -    }
    > > -    val timeTaken = System.currentTimeMillis() - startTime
    > > -    assert(timeTaken < 15000, "Operation timed out after " + timeTaken 
+ " ms")
    > > -    logInfo("Stopping context")
    > > +
    > > +    latch.await(15, TimeUnit.SECONDS)
    >
    > The current non-determinism is because we actually don't know if the data
    > was sent completely from the sink at all, not because of the processing
    > factor - which we can control via manual clock. If we do know, then we 
know
    > that it will be processed in the next 1 batch. We can simply move the 
clock
    > ahead by a couple batch intervals to ensure everything is processed.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/5918/files#r29823039>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to