Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16753967
  
    --- Diff: 
external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
 ---
    @@ -124,6 +155,13 @@ private[flume] class SparkAvroCallbackHandler(val 
threads: Int, val channel: Cha
        */
       def shutdown() {
         logInfo("Shutting down Spark Avro Callback Handler")
    +    stopped = true
    +    activeProcessorMapLock.lock()
    +    try {
    +      activeProcessors.foreach(_.shutdown())
    +    } finally {
    +      activeProcessorMapLock.unlock()
    --- End diff --
    
    There can still be a race condition, I think! Another thread can stay stuck 
in line 81, while the shutting threads enter critical sections, and marks all 
the processors for shutdown. Then when shutting thread exits critical section, 
the other thread (stuck in 81) will enter its ciritical and create a new 
transaction processor. 
    
    Isnt it?
    
    More fundamentally, why do we even need the activeProcessors? We can added 
all the processors created to the `sequenceNumberToProcessor` map, even before 
the processors has been submitted for populating events. At the time of 
shutdown, we can do `sequenceNumberToProcessor.values.foreach(_.shutdown)` to 
shutdown all of them. This would be easier to reason with regarding race 
conditions, because then we dont have to add processor in two different data 
structures (`activeProcessors` and `sequenceNumberToProcessor`), in two 
different steps (before populateEvents and after). 
    
    And in that case it become easier to synchronize. Let there be two methods 
with the following semantics.
    ```
    def createProcessor(): Option[TransactionProcessor] = 
sequenceNumberToProcessor.synchronized {
        if (!stopped) {
            // create processor, add to the map, and return Some(proc)
        } else None.
    }
    
    def removeAndGetProcessor(seqNum: CharSequence): TransactionProcessor = 
sequenceNumberToProcessor.synchronized  {
        // remove and return within the lock
    }
    
    def shutdown(): Unit = {
         sequenceNumberToProcessor.synchronized {
                 stopped = true
                 // shutdown all the processors
         }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to