[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19277 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user zuotingbing commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r140137420 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,14 +351,14 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => +codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } codec.map(_.compressedInputStream(in)).getOrElse(in) } catch { - case e: Exception => + case e: Throwable => --- End diff -- everything include exceptions and errors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r140059471 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,14 +351,14 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => +codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } codec.map(_.compressedInputStream(in)).getOrElse(in) } catch { - case e: Exception => + case e: Throwable => --- End diff -- What specific case do we want to catch here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user zuotingbing commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r139902219 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => +codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } codec.map(_.compressedInputStream(in)).getOrElse(in) } catch { case e: Exception => --- End diff -- yes , but if occurs error , it is of small significance to close the BufferedInputStream. will fix as you said. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r139894343 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => --- End diff -- While we're tightening, this whole body could be ``` codecName.map { c => val codec = codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) codec.compressedInputStream(in) }.getOrElse(in) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r139894416 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) --- End diff -- Maybe move declaration of `in` to just before the try --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r139894546 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => +codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } codec.map(_.compressedInputStream(in)).getOrElse(in) } catch { case e: Exception => --- End diff -- I also note this doesn't handle Throwable, nor does a similar block earlier in the file. In case of say OutOfMemoryError it wouldn't be closed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user zuotingbing commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r139878136 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => --- End diff -- Because there can throws an exception with "Codec [$codecName] is not available" in CompressionCodec.createCodec function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r139867429 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => +codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } codec.map(_.compressedInputStream(in)).getOrElse(in) --- End diff -- Is it better to move this line ` val in = new BufferedInputStream(fs.open(log))` to here to solve your problem? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r139867369 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => --- End diff -- Why would here throw an exception? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org