linehrr commented on issue #24461: [SPARK-27434][CORE] Fix mem leak due to hadoop fs caching mechanism when eventLog is enabled URL: https://github.com/apache/spark/pull/24461#issuecomment-488058505 yea, we run multiple contexts in sequence and that works well. however looking at this patch: HADOOP-12958 ``` - private class StatisticsDataReference extends PhantomReference<Thread> { + private class StatisticsDataReference extends WeakReference<Thread> { ``` we do have that in place, since we are using `hadoop-2.7.3`. also to double make sure we checked runtime jar as well, and we see that `hadoop-common` is using WeakReference now(from ugly binary Java class): ``` Statistics^A^@^LInnerClasses^G^@7^A^@^NStatisticsData^A^@;Lorg/apache/hadoop/fs/FileSystem$Statistics$StatisticsData;^A^@^Fthis$0^A^@,Lorg/apache/hadoop/fs/FileSystem$Statistics;^A^@^F<init>^A^@|(Lorg/apache/hadoop/fs/FileSystem$Statistics;Lorg/ap ache/hadoop/fs/FileSystem$Statistics$StatisticsData;Ljava/lang/Thread;)V^A^@^DCode^A^@^OLineNumberTable^A^@^RLocalVariableTable^A^@^Dthis^A^@^WStatisticsDataReference^A^@DLorg/apache/hadoop/fs/FileSystem$Statistics$StatisticsDataReference;^A^@^Fth read^A^@^RLjava/lang/Thread;^A^@^GgetData^A^@=()Lorg/apache/hadoop/fs/FileSystem$Statistics$StatisticsData;^A^@^GcleanUp^A^@^C()V^A^@^MStackMapTable^G^@3^G^@8^G^@9^A^@ Signature^A^@1Ljava/lang/ref/WeakReference<Ljava/lang/Thread;>;^A^@``` **but what I see now is**, how would WeakReference fix this problem if you still have strong reference from HashSet refers to this `StatisticsDataReference`? ``` public static final class Statistics { private final Set<FileSystem.Statistics.StatisticsDataReference> allData; ... public Statistics(FileSystem.Statistics other) { ... this.allData = new HashSet(); } public FileSystem.Statistics.StatisticsData getThreadStatistics() { FileSystem.Statistics.StatisticsData data = (FileSystem.Statistics.StatisticsData)this.threadData.get(); if (data == null) { data = new FileSystem.Statistics.StatisticsData(); this.threadData.set(data); FileSystem.Statistics.StatisticsDataReference ref = new FileSystem.Statistics.StatisticsDataReference(data, Thread.currentThread()); synchronized(this) { this.allData.add(ref); } } return data; } ``` only place this strong reference from HashSet could be removed is by calling `cleanUp` on `StatisticsDataReference`: ```private class StatisticsDataReference extends WeakReference<Thread> { private final FileSystem.Statistics.StatisticsData data; public StatisticsDataReference(FileSystem.Statistics.StatisticsData data, Thread thread) { super(thread, FileSystem.Statistics.STATS_DATA_REF_QUEUE); this.data = data; } public FileSystem.Statistics.StatisticsData getData() { return this.data; } public void cleanUp() { synchronized(Statistics.this) { Statistics.this.rootData.add(this.data); Statistics.this.allData.remove(this); } } } ``` but checking the code, seems it's never called. so it will be held in that HashSet. in the meanwhile, new ref is been created every time this is called: ``` public FileSystem.Statistics.StatisticsData getThreadStatistics() { FileSystem.Statistics.StatisticsData data = (FileSystem.Statistics.StatisticsData)this.threadData.get(); if (data == null) { data = new FileSystem.Statistics.StatisticsData(); this.threadData.set(data); FileSystem.Statistics.StatisticsDataReference ref = new FileSystem.Statistics.StatisticsDataReference(data, Thread.currentThread()); synchronized(this) { this.allData.add(ref); } } return data; } ``` which is called in class `SparkHadoopUtil` by ``` private[spark] def getFSBytesWrittenOnThreadCallback(): () => Long = { val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics) val f = () => threadStats.map(_.getBytesWritten).sum val baselineBytesWritten = f() () => f() - baselineBytesWritten } ``` and ``` private[spark] def getFSBytesReadOnThreadCallback(): () => Long = { val f = () => FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum val baseline = (Thread.currentThread().getId, f()) ``` can anyone verify if my finding makes sense?
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
