Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16685628
  
    --- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
 ---
    @@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = 
eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with 
sequence number: " + seq)
    -                try {
    -                  // Convert each Flume event to a serializable 
SparkFlumeEvent
    -                  val buffer = new 
ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    +            while (!isStopped()) {
    +              val connection = connections.poll()
    +              val client = connection.client
    +              var batchReceived = false
    +              try {
    +                var eventBatch: EventBatch = null
    +                var seq: CharSequence = null
    +                Try {
    +                  eventBatch = client.getEventBatch(maxBatchSize)
    +                  batchReceived = true
    +                  if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +                    // No error, proceed with processing data
    +                    seq = eventBatch.getSequenceNumber
    +                    val events: java.util.List[SparkSinkEvent] = 
eventBatch.getEvents
    +                    logDebug(
    +                      "Received batch of " + events.size() + " events with 
sequence number: " + seq)
    +                    // Convert each Flume event to a serializable 
SparkFlumeEvent
    +                    val buffer = new 
ArrayBuffer[SparkFlumeEvent](events.size())
    +                    var j = 0
    +                    while (j < events.size()) {
    +                      buffer += toSparkFlumeEvent(events(j))
    +                      j += 1
    +                    }
    +                    store(buffer)
    +                    logDebug("Sending ack for sequence number: " + seq)
    +                    // Send an ack to Flume so that Flume discards the 
events from its channels.
    +                    client.ack(seq)
    +                    logDebug("Ack sent for sequence number: " + seq)
    +                  } else {
    +                    batchReceived = false
    +                    logWarning(
    +                      "Did not receive events from Flume agent due to 
error on the Flume " +
    +                        "agent: " + eventBatch.getErrorMsg)
                       }
    -                  store(buffer)
    -                  logDebug("Sending ack for sequence number: " + seq)
    -                  // Send an ack to Flume so that Flume discards the 
events from its channels.
    -                  client.ack(seq)
    -                  logDebug("Ack sent for sequence number: " + seq)
    -                } catch {
    +                }.recoverWith {
                       case e: Exception =>
    -                    try {
    -                      // Let Flume know that the events need to be pushed 
back into the channel.
    -                      logDebug("Sending nack for sequence number: " + seq)
    -                      client.nack(seq) // If the agent is down, even this 
could fail and throw
    -                      logDebug("Nack sent for sequence number: " + seq)
    -                    } catch {
    -                      case e: Exception => logError(
    -                        "Sending Nack also failed. A Flume agent is down.")
    +                    Try {
    +                      Throwables.getRootCause(e) match {
    +                        // If the cause was an InterruptedException,
    +                        // then check if the receiver is stopped - if yes,
    +                        // just break out of the loop. Else send a Nack and
    +                        // log a warning.
    +                        // In the unlikely case, the cause was not an 
Exception,
    +                        // then just throw it out and exit.
    +                        case interrupted: InterruptedException =>
    +                          if (isStopped()) {
    +                            loop.break()
    +                          } else {
    +                            logWarning("Interrupted while receiving data 
from Flume", interrupted)
    +                            sendNack(batchReceived, client, seq)
    +                          }
    +                        case exception: Exception =>
    +                          logWarning("Error while receiving data from 
Flume", exception)
    +                          sendNack(batchReceived, client, seq)
    +                        case majorError: Throwable =>
    --- End diff --
    
    Do not catch Throwables. Scala subsystem often uses throwables for variale 
control flow, and catch throwable can have unpredictable consequences.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to