Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/15556#discussion_r84374330
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---
@@ -43,38 +43,56 @@ private[spark] class ReplayListenerBus extends
SparkListenerBus with Logging {
* @param sourceName Filename (or other source identifier) from whence
@logData is being read
* @param maybeTruncated Indicate whether log file might be truncated
(some abnormal situations
* encountered, log file might not finished writing) or not
+ * @param eventsFilter Filter function to select JSON event strings in
the log data stream that
+ * should be parsed and replayed. When not specified, all event
strings in the log data
+ * are parsed and replayed.
*/
def replay(
logData: InputStream,
sourceName: String,
- maybeTruncated: Boolean = false): Unit = {
- var currentLine: String = null
- var lineNumber: Int = 1
+ maybeTruncated: Boolean = false,
+ eventsFilter: (String) => Boolean =
ReplayListenerBus.SELECT_ALL_FILTER): Unit = {
try {
- val lines = Source.fromInputStream(logData).getLines()
- while (lines.hasNext) {
- currentLine = lines.next()
+ val lineEntries = Source.fromInputStream(logData)
+ .getLines()
+ .zipWithIndex
+ .filter(entry => eventsFilter(entry._1))
+
+ var entry: (String, Int) = ("", 0)
+
+ while (lineEntries.hasNext) {
try {
- postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
+ entry = lineEntries.next()
+ postToAll(JsonProtocol.sparkEventFromJson(parse(entry._1)))
} catch {
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file
that might be truncated
- if (!maybeTruncated || lines.hasNext) {
+ // the last entry may not be the very last line in the event
log, but we treat it
+ // as such in a best effort to replay the given input
+ if (!maybeTruncated || lineEntries.hasNext) {
throw jpe
} else {
logWarning(s"Got JsonParseException from log file
$sourceName" +
- s" at line $lineNumber, the file might not have finished
writing cleanly.")
+ s" at line number ${entry._2}, the file might not have
finished writing cleanly.")
}
+
+ case e: Exception =>
+ logError (s"Exception parsing Spark event log $sourceName" +
+ s" at line number: ${entry._2}", e)
+ throw e // quit processing the event log
}
- lineNumber += 1
}
} catch {
case ioe: IOException =>
throw ioe
case e: Exception =>
logError(s"Exception parsing Spark event log: $sourceName", e)
- logError(s"Malformed line #$lineNumber: $currentLine\n")
}
}
+}
+
+private[spark] object ReplayListenerBus {
+ // utility filter that selects all event logs during replay
+ val SELECT_ALL_FILTER = (eventString: String) => true
--- End diff --
Instead of this I have a slight preference for an overloaded `replay`
method that does not take a filter. Or a default value for the filter argument.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]