Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16512127
  
    --- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
 ---
    @@ -90,53 +95,76 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = 
eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with 
sequence number: " + seq)
    -                try {
    -                  // Convert each Flume event to a serializable 
SparkFlumeEvent
    -                  val buffer = new 
ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    +            while (!isStopped()) {
    +              val connection = connections.poll()
    +              val client = connection.client
    +              var batchReceived = false
    +              try {
    +                var eventBatch: EventBatch = null
    +                var seq: CharSequence = null
    +                Try {
    +                  eventBatch = client.getEventBatch(maxBatchSize)
    +                  batchReceived = true
    +                  if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +                    // No error, proceed with processing data
    +                    seq = eventBatch.getSequenceNumber
    +                    val events: java.util.List[SparkSinkEvent] = 
eventBatch.getEvents
    +                    logDebug(
    +                      "Received batch of " + events.size() + " events with 
sequence number: " + seq)
    +                    // Convert each Flume event to a serializable 
SparkFlumeEvent
    +                    val buffer = new 
ArrayBuffer[SparkFlumeEvent](events.size())
    +                    var j = 0
    +                    while (j < events.size()) {
    +                      buffer += toSparkFlumeEvent(events(j))
    +                      j += 1
    +                    }
    +                    store(buffer)
    +                    logDebug("Sending ack for sequence number: " + seq)
    +                    // Send an ack to Flume so that Flume discards the 
events from its channels.
    +                    client.ack(seq)
    +                    logDebug("Ack sent for sequence number: " + seq)
    +                  } else {
    +                    batchReceived = false
    +                    logWarning(
    +                      "Did not receive events from Flume agent due to 
error on the Flume " +
    +                        "agent: " + eventBatch.getErrorMsg)
                       }
    -                  store(buffer)
    -                  logDebug("Sending ack for sequence number: " + seq)
    -                  // Send an ack to Flume so that Flume discards the 
events from its channels.
    -                  client.ack(seq)
    -                  logDebug("Ack sent for sequence number: " + seq)
    -                } catch {
    +                }.recover {
                       case e: Exception =>
    -                    try {
    -                      // Let Flume know that the events need to be pushed 
back into the channel.
    -                      logDebug("Sending nack for sequence number: " + seq)
    -                      client.nack(seq) // If the agent is down, even this 
could fail and throw
    -                      logDebug("Nack sent for sequence number: " + seq)
    -                    } catch {
    -                      case e: Exception => logError(
    -                        "Sending Nack also failed. A Flume agent is down.")
    +                    // Is the exception an interrupted exception? If yes,
    +                    // check if the receiver was stopped. If the receiver 
was stopped,
    +                    // simply exit. Else send a Nack and exit.
    +                    if (e.isInstanceOf[InterruptedException]) {
    +                      if (isStopped()) {
    +                        loop.break()
    +                      } else {
    +                        sendNack(batchReceived, client, seq, e)
    +                      }
    +                    }
    +                    // if there is a cause do the same check as above.
    +                    Option(e.getCause) match {
    --- End diff --
    
    If the interrupt comes in at the time of an Avro RPC call, then an 
AvroRemoteException with InterruptedException as cause gets thrown. I am 
basically checking for InterruptedException and any exception which was caused 
by InterruptedException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to