linehrr commented on issue #24461: [SPARK-27434][CORE] Fix mem leak due to 
hadoop fs caching mechanism when eventLog is enabled
URL: https://github.com/apache/spark/pull/24461#issuecomment-488058505
 
 
   yea, we run multiple contexts in sequence and that works well. 
   however looking at this patch: HADOOP-12958
   ```
   -    private class StatisticsDataReference extends PhantomReference<Thread> {
   +    private class StatisticsDataReference extends WeakReference<Thread> {
   ```
   
   we do have that in place, since we are using `hadoop-2.7.3`. also to double 
make sure we checked runtime jar as well,  and we see that `hadoop-common` is 
using WeakReference now(from ugly binary Java class): 
   ``` 
Statistics^A^@^LInnerClasses^G^@7^A^@^NStatisticsData^A^@;Lorg/apache/hadoop/fs/FileSystem$Statistics$StatisticsData;^A^@^Fthis$0^A^@,Lorg/apache/hadoop/fs/FileSystem$Statistics;^A^@^F<init>^A^@|(Lorg/apache/hadoop/fs/FileSystem$Statistics;Lorg/ap
    
ache/hadoop/fs/FileSystem$Statistics$StatisticsData;Ljava/lang/Thread;)V^A^@^DCode^A^@^OLineNumberTable^A^@^RLocalVariableTable^A^@^Dthis^A^@^WStatisticsDataReference^A^@DLorg/apache/hadoop/fs/FileSystem$Statistics$StatisticsDataReference;^A^@^Fth
    
read^A^@^RLjava/lang/Thread;^A^@^GgetData^A^@=()Lorg/apache/hadoop/fs/FileSystem$Statistics$StatisticsData;^A^@^GcleanUp^A^@^C()V^A^@^MStackMapTable^G^@3^G^@8^G^@9^A^@
 Signature^A^@1Ljava/lang/ref/WeakReference<Ljava/lang/Thread;>;^A^@```
   
   **but what I see now is**, how would WeakReference fix this problem if you 
still have strong reference from HashSet refers to this  
`StatisticsDataReference`? 
   
   ```
       public static final class Statistics {
           private final Set<FileSystem.Statistics.StatisticsDataReference> 
allData;
           ...
           public Statistics(FileSystem.Statistics other) {
               ...
               this.allData = new HashSet();
           }
   
           public FileSystem.Statistics.StatisticsData getThreadStatistics() {
               FileSystem.Statistics.StatisticsData data = 
(FileSystem.Statistics.StatisticsData)this.threadData.get();
               if (data == null) {
                   data = new FileSystem.Statistics.StatisticsData();
                   this.threadData.set(data);
                   FileSystem.Statistics.StatisticsDataReference ref = new 
FileSystem.Statistics.StatisticsDataReference(data, Thread.currentThread());
                   synchronized(this) {
                       this.allData.add(ref);
                   }
               }
   
               return data;
    
   
   }
   ```
   
   only place this strong reference from HashSet could be removed is by calling 
`cleanUp` on `StatisticsDataReference`: 
   ```private class StatisticsDataReference extends WeakReference<Thread> {
               private final FileSystem.Statistics.StatisticsData data;
   
               public 
StatisticsDataReference(FileSystem.Statistics.StatisticsData data, Thread 
thread) {
                   super(thread, FileSystem.Statistics.STATS_DATA_REF_QUEUE);
                   this.data = data;
               }
   
               public FileSystem.Statistics.StatisticsData getData() {
                   return this.data;
               }
   
               public void cleanUp() {
                   synchronized(Statistics.this) {
                       Statistics.this.rootData.add(this.data);
                       Statistics.this.allData.remove(this);
                   }
               }
           }
   ```
   
   but checking the code, seems it's never called. so it will be held in that 
HashSet. 
   
   in the meanwhile, new ref is been created every time this is called: 
   ```        
   public FileSystem.Statistics.StatisticsData getThreadStatistics() {
               FileSystem.Statistics.StatisticsData data = 
(FileSystem.Statistics.StatisticsData)this.threadData.get();
               if (data == null) {
                   data = new FileSystem.Statistics.StatisticsData();
                   this.threadData.set(data);
                   FileSystem.Statistics.StatisticsDataReference ref = new 
FileSystem.Statistics.StatisticsDataReference(data, Thread.currentThread());
                   synchronized(this) {
                       this.allData.add(ref);
                   }
               }
   
               return data;
           }
   ```
   which is called in class `SparkHadoopUtil` by 
   ```
    private[spark] def getFSBytesWrittenOnThreadCallback(): () => Long = {
       val threadStats = 
FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
       val f = () => threadStats.map(_.getBytesWritten).sum
       val baselineBytesWritten = f()
       () => f() - baselineBytesWritten
     }
    ```
   and 
   ```
     private[spark] def getFSBytesReadOnThreadCallback(): () => Long = {
       val f = () => 
FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum
       val baseline = (Thread.currentThread().getId, f())
   ```
   
   can anyone verify if my finding makes sense? 
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to