Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/2065#discussion_r16683481
--- Diff:
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
---
@@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
// Threads that pull data from Flume.
receiverExecutor.submit(new Runnable {
override def run(): Unit = {
- while (true) {
- val connection = connections.poll()
- val client = connection.client
- try {
- val eventBatch = client.getEventBatch(maxBatchSize)
- if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
- // No error, proceed with processing data
- val seq = eventBatch.getSequenceNumber
- val events: java.util.List[SparkSinkEvent] =
eventBatch.getEvents
- logDebug(
- "Received batch of " + events.size() + " events with
sequence number: " + seq)
- try {
- // Convert each Flume event to a serializable
SparkFlumeEvent
- val buffer = new
ArrayBuffer[SparkFlumeEvent](events.size())
- var j = 0
- while (j < events.size()) {
- buffer += toSparkFlumeEvent(events(j))
- j += 1
+ val loop = new Breaks
+ loop.breakable {
+ while (!isStopped()) {
+ val connection = connections.poll()
+ val client = connection.client
+ var batchReceived = false
+ try {
+ var eventBatch: EventBatch = null
+ var seq: CharSequence = null
+ Try {
--- End diff --
Why are there so many nested Try's and try's in this. The code looks pretty
complex and verbose. I strongly recommend taking another pass, may be break the
code into smaller functions (with good semantically-meaningful names).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]